NiFi & NiFi Registry on the Google Cloud Platform with Cloud Source Repositories


This post is about quickly and easily deploying an unsecured instance of NiFi and an unsecured instance of the NiFi Registry which uses the Cloud Source Repositories service as backend for the flow persistence provider.

The objective is to quickly deploy NiFi and the NiFi Registry, connect the two together, version the workflows in the Source Repositories, and be up and running quickly to start building workflows. This is not suitable for production deployment as we are not securing the instances (I’ll talk about that in another post).

Also this story is about a new feature in NiFi Registry 0.4.0 (NIFIREG-209) which allows the NiFi Registry to rebuild all the metadata from an existing Git repository of flows. It’s a very nice feature when you start and stop NiFi instances on the fly while also having access to your versioned flows very easily. Actually, using this feature, we could run the NiFi Registry in Google Cloud Run and have the production instances of NiFi just pulling the versions of the flows from the NiFi Registry exposed by Google Cloud Run. By doing that you would leverage the advantages of serverless. If you are interested by Google Cloud Run, you might be interested about this post for running NiFi workflows in Cloud Run.


Setup Source Repository

I start creating a fresh new project in my Google Cloud Platform console. I call this new project ‘nifi-registry’. Once the project is created, I go into Source Repositories. If it’s your first time, click on ‘Get started’ and ‘Create repository’.

Source Repositories is the Google Cloud offer to get free unlimited private Git repositories to organize your code in a way that works best for you (you can also mirror code from GitHub or Bitbucket repositories to get powerful code search, code browsing, and diagnostics capabilities). It also nicely integrates with CI/CD tools.

In my case, I create a new repository that I call ‘nifi-flow-repository’.

Let’s now setup the SSH key to allow access to the repository.

$ ssh-keygen -t rsa -b 4096 -m PEM -C "NiFi Registry"

Generating public/private rsa key pair.
Enter file in which to save the key (~/.ssh/id_rsa): ~/temp/id_rsa
Enter passphrase (empty for no passphrase): 
Enter same passphrase again: 
Your identification has been saved in ~/temp/id_rsa.
Your public key has been saved in ~/temp/id_rsa.pub.
The key fingerprint is:
SHA256:/hu6FZLvcvDigP4ixPFPEGzfpY1RHOoTRytjqYOoYHo NiFi Registry
The key's randomart image is:
+---[RSA 4096]----+
|   .     o+.     |
|    +   .+o.     |
|   . o .B*o      |
|  .....++*.      |
|.o.o..o S .      |
|+.o ...o.+ .     |
|o.E .o. .o+      |
| .... ..o=o.     |
|   ..o..+=+.     |
+----[SHA256]-----+


$ ls
id_rsa     id_rsa.pub

For this demo to work, we generate a PEM encoded key and we use an empty passphrase (again, this is not ideal for production). Once done, you can register the SSH key with Google Cloud (there is a link available after you created the repository). You just have to give a name to the key and copy the content of the generated id_rsa.pub file.

Start the NiFi Registry

We can now focus on starting the NiFi Registry. To be up and running very quickly, I’m going to rely on the Docker image provided by Apache NiFi and use it in a simple Compute Engine instance with Docker enabled.

In Compute Engine / Instance templates, you can create a new template. Here is my setup with the parameters I changed (adapt it to your needs):

  • Name : nifi-registry
  • Check “Deploy a container image to this VM instance”
  • Container image : apache/nifi-registry
  • Go into “Advanced container options”
  • In the volume mounts (use Directory as volume type) :
  • Go into “Management, security, disks, networking, sole tenancy”
  • Add the below startup script :
#! /bin/bash

# This script is used when starting a docker image based GCE instance
# of the NiFi Registry. It is intended to configure the NiFi Registry
# so that the persistence provider is the Google Cloud Source Repo.

# Docker volumes (directory type)
# /tmp/config => /home/nifi/.ssh - Read only
# /tmp/ssh/id_rsa => /id_rsa - Read only
# /tmp/nifi-flow-repository => /nifi-flow-repository - Read/Write
# /tmp/providers.xml => /opt/nifi-registry/nifi-registry-0.4.0/conf/providers.xml - Read only

# Note that 1000 is uid/gid for NiFi user/group in the Docker container

# create directory for SSH keys
mkdir /tmp/ssh

# private SSH key to authenticate againt the Google Cloud Source Repo
# its associated public key needs to be registered on Source Repo
# this is the content of the id_rsa file we generated, change it with yours!
touch /tmp/ssh/id_rsa && chmod 600 /tmp/ssh/id_rsa
cat <<EOF &gt;&gt; /tmp/ssh/id_rsa
-----BEGIN RSA PRIVATE KEY-----
MIIJJwIBAAKCAgEAxRVsCHEUnKJXZnJ8GZVb/D0bvCrzK/p8jYiuleUqCgrqJ+2D
hocKkLqPKU5yiiA11vDCgrn7GBkWUH+Naj6rc9qZviwJtuZFjfKt+gg9++JLsnQn
8TdfbZ7nKZ2fh397Yr50mLP9wJw6A0nd68Y9YowRID64ZJ+kfqtnr5gaU5wI6/j+
UEV8QO2hpQfJ+hI0TUe82kY3l21J4FcLGbKsZSwMjRDRIG9fly4adEVsPq+WZFJ1
6q8ZsGfLpArCORQiM81mBFTeeLn65FZwDEsAIko3Fv+OVBjv+YtpaUOL4QwM9E3X
9v+0WqkKG0MYm12FjHkulIdHbO9gVIXKM8sIyI+J49fxWiLnb+HiYExCvdgaKCAf
eR12zIjTXHrX8YHF5uxfE0oCMgPcZIcsSVO0zjNNKniJy4AWU46hxB8USI7Kf9jD
i/YVRmDi67mXSi6z8FjamccIWg8CnWVkLTr771ERZATZAn7YDY7Zu8Z+AXE6U12o
7hkcB940B+hHnHrB32iKiOev+NLR+NCde1YUBHfaw6vmmsBwrmO9H/cSGz9L1aC/
seSOBrnWef/YZXAXtMb1maSGpVEW+74uR05UL5Bt5xlB+MAQGRcDWkMJ08teqYGZ
IOgqqeESihXq2Suk1+nMhO/kUHdUBHVHXBmQTqtRMKjMyyRaM/rU/BwF0PECAwEA
AQKCAgBEFI1YMS/sl8vXUO77q3O2I6nkC3YrGAFUpwWgNCSciX6vwkVwAFAvBLMV
ksrQWiYfFNYscHCDt47Uyesg63swry/y2KWWb99RFKbyu0wmKdr6T1PI6NbnOEAk
SRXlqa0GxEHkyjB7C7yijC7EFpv34ei8mc36vIcHVtCfgMx/W2Rdl4rKUeaFS1FO
f+1OnGFC3OgpAp6Lia4+d/MWsnkJDflb9ZY4PSDpSXzL83tcAC3UhAPFySz3mjNn
fGvxaboS6P43utWLILvBivZz2Ki4L6juIeOZu2+SZY1JVpMIb7A96HIVGenEc6ZR
GpFyghDzIJ1to3mR8PL236ykzZs/iRbRGOvHSHNFiAmKMIXqeYe+Q6u/9mGVo68J
Vbw0PeTWaaPnnBM3iVi2d3STGsI6x2bTtosDyRS5YLNztj0Y8mOSWQz8JvnTcikM
lKBcSbA3K/ophzSyjk8sN65HqM1JQ0v/bK6w8TtZBLThPwcqRtbvYQBCQC7g74Oe
5KAuJRDTKDSxkLlD9AX0hdMIMIVKNd5h8Sx7vlX6DZ/5PdLT87eIS39mQlXNjE6W
kvGPte1YlTqWFECmxHlxvSEMB6txEzUrfEVLpTdKkpnygQFJfZdKzTiIsFiqKn9s
s6IAuzyHBJ6MMUdstZ8DMMnE/uXWU3ouUemSLcRiPMXi5AlZkQKCAQEA+EuQ75un
9PS/D6pCcOrZaodK0J2Xz4KX2VkuEJeP/KmDAFwcB0HzpQO6zQglIf8Sv9r3/g10
qk6pt5Y+gqBnpQOV/uUgN7U0lTMAnxx4l3RxXKZW3cqEYL326ouzRKDjz7TxYiWK
T+RT4QlK0jm7NfqZ8trqwi4dB4Nz5JA4MItoSinyWYzRfnaNH8jBxYmRyDGWQLsq
y2Dep6AZ2GPez473xl6TdwTc47UnDYbyuXdhK4yV+Jr9quYUkda+gFVlh7ZEK/vi
WsBoh0IjG07C4m8rNdxHnfplH9MIFo+I2u+hk6Ot4hpTc9b+Vj93FidSnboOEjDh
YD6MM1A8gVJdtQKCAQEAyzMJVHOAWq0F14h6vUY9m8lkw3KBZorfTsDD39kG3P7C
lh/VCCqk7Uz6drkPc6Gb/xp1LgCDhdxxNISos/92IINFwTmg1bQpaCBsfgkM+r2g
S87hUdCL3NCLwsrS67oQjalEAVu8TZ2obCZ48EgasiY48tq7SrQ7+tW7PSjnO5pv
pM661m5Gmp0/y7+0CkPg2JyGp4mwlYY5PgHUckpjwl7dHiDqTSYypOH0rX8i0UF7
NlOpWIFK1/o3K5YzenJwLIm9tdhG+AJ4ZXUOL+ihoUp+uyac33jCAsg34n2spEgi
F/5V8SZ/oYImBeqZJBSf/eOllAVQMHoNyU3F7SILzQKCAQAul6+I5wKresnnnKF/
IvLNhLnLT+dO70ijZpK4VliUpxKIHMC9s+iOjJafJuog6QaRrftfVxMPald6tdzh
EkygsH2TKVfUXFKTtNBnCyat9RHYuvYOBJS2uq788F4hgLd/IIszSawctdHvppi9
vkudI3uEEQSAj7qu0EINH+sLYP2e/SQXHc+qFYEB9+A0u0357SQu3XB4XaMqfWac
LpF/DWr9dH3jlawFpta/ORWPLBG15Fm4Hw1+5lHx1ARHfL7iqpc8UbX2Jaj3yLdh
xnPXndjT8JQX1wbm4+jeouyheNovJEXa6enDERMFCD/GjnZ4VpORYk7IirQwZNwV
wGkJAoIBACYlU5gOAseC+bjHfzsvB3vKZ+clBNPKODehimPoaxhrnv3txeE9mC6Q
J+jHvvXXHeDbB6p2IDqt8naIfN8lkvhxjFPEzMOxiaBpjdRvQIeFt6Tjlnr0an0u
jT4pM0VbbaYaY5DZttTfRvHemw+IibJt6Hz2wPg6M5RYMUk+94HB8TmAMaT1mL2H
zaUjPNo8eeZQJBsphwPa6b4RO9+kxWuEwX/ZcALBq/o2DOfRGSktYMMHG6BozTMU
Xu0IymsvNo214e5URqZiWFW7jslBo64SvQ3HJuXw1oMNMSiMrS8992CHt3yI3Kbm
NtsfelZCpPJVnQzXnoErOJFUz1Y/8PUCggEASI2Uky/TDLCU5Inm2hN1zzozF/kh
pvKks4Kd93BSiMWDHg07HtfkGPBINQdwGScLuTK63uHEuVTB1e1VM+Kd+FqoddoA
xEfdFKoDMgyhE2KNg9iiePbkeXzCCY9yFD2xQ+zfMdqd/755Fg356IfETSx59WzO
vNTQShrwj5zOE1H3wl+YYsAWA1AiHslzwVp5pO26KhQLMO90q5j2g7gJEWOfhUDi
bkhJu3rXdJpiYOCngy0vKiSFYrvFfYh4hZ4+mL/TzWIuJtC0zmSetk1VwOYBkwJZ
Uo1bAaMdFJMC/QAWiJv95jGuusV39nT7pxd93sW1Iiv9OXnsxj8OeShEVg==
-----END RSA PRIVATE KEY-----
EOF

# clone the Google Cloud Source Repository
cd /tmp
ssh-agent bash -c 'ssh-add /tmp/ssh/id_rsa; git clone ssh://admin@pierrevillard.com@source.developers.google.com:2022/p/nifi-registry-245014/r/nifi-flow-repository'
chmod 755 /tmp/nifi-flow-repository
chown -R 1000:1000 /tmp/nifi-flow-repository

# Create the providers configuration for the NiFi Registry
# no user/password because we use SSH authentication
cat <<EOF &gt; /tmp/providers.xml
<providers&gt;
  <flowPersistenceProvider&gt;
    <class&gt;org.apache.nifi.registry.provider.flow.git.GitFlowPersistenceProvider</class&gt;
    <property name="Flow Storage Directory"&gt;/nifi-flow-repository</property&gt;
    <property name="Remote To Push"&gt;origin</property&gt;
    <property name="Remote Access User"&gt;</property&gt;
    <property name="Remote Access Password"&gt;</property&gt;
  </flowPersistenceProvider&gt;
  <extensionBundlePersistenceProvider&gt;
    <class&gt;org.apache.nifi.registry.provider.extension.FileSystemBundlePersistenceProvider</class&gt;
    <property name="Extension Bundle Storage Directory"&gt;./extension_bundles</property&gt;
  </extensionBundlePersistenceProvider&gt;
</providers&gt;
EOF
chown 1000:1000 /tmp/providers.xml

# .ssh/config and .ssh/known_hosts files
mkdir /tmp/config
cat <<EOF &gt;&gt; /tmp/config/config
Host source.developers.google.com
  HostName source.developers.google.com
  IdentityFile /id_rsa
EOF
ssh-keyscan -p 2022 source.developers.google.com &gt;&gt; /tmp/config/known_hosts
chown -R 1000:1000 /tmp/config

# change chmod on the private key to allow access to 'nifi' user inside container
chown 1000:1000 /tmp/ssh/id_rsa

Note 1 — we are using templates to get up and running very quickly each time you want to start a new instance with the same configuration.

Note 2 — the above approach is not recommended as we are copying/pasting the private key in the startup script but this is due to the restrictions coming with the Container Optimized OS used for this demo. In a better world, we would use Cloud Build to have our own NiFi Registry image and use it instead. Or we could deploy the public image on Google Kubernetes Engine and use secrets.

Once your template is created you can open it and click “Create VM”:

Then you can give a name to your instance (let’s say ‘nifi-registry’) and start it. You should have an instance up and running:

After configuring the proper Firewall rule to allow access from your personal network to the instance on the port 18080, you should be able to access the NiFi Registry at http://<external IP>:18080/nifi-registry :

You can go in Settings (top right) and create a new bucket:

You now have a NiFi Registry up and running and you have initialized you first bucket. We can now deploy a NiFi instance, connect it with the Registry and create out first workflow.

Start a standalone NiFi instance

It’s very easy! Just go in Compute Engine / VM instances and click “Create instance”. Then just give a proper name to your instance and configure it to use the NiFi Docker image:

Start your VM and wait for few minutes. After configuring the Firewall rule to allow access from your personal network, you should be able to access NiFi on port 8080:

Go into the top-right hamburger menu and go into Controller Settings. Then go into the Registry Clients tab and click the + button to configure your registry:

You can now add a Process Group into the canvas, right click on it and start versioning:

You will notice that we can see the bucket we created in the registry. We can give a name to our workflow, a description, and a commit for this version.

Once we click save, we have the confirmation that the workflow has been correctly versioned:

We can check in our Cloud Source Repository that we do have data:

That’s it. You can now create a more complex workflow and commit the new version into the Registry, this will be saved into your repository. Even better, if you kill your NiFi Registry instance, and start a new one, you will be able to keep working and pull all the workflows you previously stored in the repository — all the metadata will be generated from the repository data at startup.

There is much more to come about NiFi on Google Cloud, stay tuned! Thanks for reading and feel free to comment and/or ask questions.

Deploying NiFi workflows on Google Cloud Run

Google Cloud just launched a new managed service call Cloud Run which is currently in public beta.

Cloud Run is a managed compute platform that enables you to run stateless containers that are invocable via HTTP requests. Cloud Run is serverless: it abstracts away all infrastructure management, so you can focus on what matters most — building great applications. It is built from Knative and let you run your containers fully managed with Cloud Run.

The huge benefit of this approach is: if there is no call to your service, there won’t be any container running and you won’t be charged. When a call is received, the container is instantiated to serve the request. If there are new requests, the container remains alive until there is no more request to answer. Besides, in case of traffic increase, your app will scale and more containers will be instantiated.

Also, when deploying a new version (revision) of your service, requests are automatically routed as soon as possible to the latest healthy service revision. This makes the whole CI/CD and service lifecycle much easier.

I wanted to play with this new service using NiFi: the idea is to have a workflow starting with HandleHttpRequest processor and ending with HandleHttpResponse processor to easily develop “functions” and deploy the web services in Cloud Run.

Note – everything used for this demo is available on this Github repo.

The workflow

Here is a simple workflow I developed and deployed on Cloud Run: this is a workflow expecting CSV data as input and converting the CSV data into JSON.

I want to emphasize that this is a very simple case. But think about all the great features you can use in NiFi following this approach. Possible use cases are pretty much infinite.

Basically, once the workflow is running I can use a POST HTTP request to get my JSON:

$ cat test.csv
name,company
Pierre Villard,Google

$ curl --data-binary "@test.csv" -H "Content-Type: text/csv" -X POST http://localhost:9090/
[ {
  "name" : "Pierre Villard",
  "company" : "Google"
} ]

This is a very simple workflow:

  • HandleHttpRequest – will start listening on a given port for HTTP(S) requests
  • ConvertRecord – will convert my CSV data into JSON using the header to infer the schema
  • HandleHttpResponse – return the result to the user
Simple workflow to convert CSV to JSON using a HTTP endpoint

Considerations for the container

The container must be stateless and listen for HTTP requests on $PORT that will be passed to the container as an environment variable. Have a look at the container runtime contract.

In the NiFi world, that means that the property defining the listening port in the HandleHttpRequest processor must accept Expression Language. Unfortunately, that is not the case and I submitted a fix with NIFI-6144. However, in Cloud Run container instances, the PORT environment variable is always set to 8080 (but for portability reasons, your code should not hardcode this value). This means, you don’t strictly need the fix I submitted to play with Cloud Run and NiFi, but it’s safer if you do.

The container, when started, must be able to serve the request within 4 minutes which means that we want a container able to start as quick as possible and with a low footprint. To do that, I chose to work with the MiNiFi Docker image instead of the NiFi one.

Converting the workflow for MiNiFi

Once the workflow is developed, you can save it as a template (XML file) and use the MiNiFi toolkit to convert the XML file into a YAML file that will be useable by MiNiFi.

# command to convert the template into yml file for MiNiFi
$MINIFI_HOME/bin/config.sh transform templateConvertRecord.xml config.yml

You can have a look at the YAML file on the Github repository.

Building the container

By default, the MiNiFi Docker image comes with a limited set of features and it’s up to you to add the elements you need to run your workflow if required. In my case, I added:

  • a custom build of the standard processors NAR to include my fix,
  • the HTTP context NAR that is required when using HandleHttpRequest/Response processors,
  • and the Record serialization NAR required for the ConvertRecord processor.

Here is my Dockerfile:

FROM apache/nifi-minifi:latest
USER root

ENV MINIFI_HOME /opt/minifi/minifi-0.5.0

ADD config.yml $MINIFI_HOME/conf/config.yml
ADD *.nar $MINIFI_HOME/lib/

RUN chown -R minifi:minifi $MINIFI_HOME

USER minifi

You then need to create your image and push it to the Container Registry service in the Google Cloud Platform. You can have a look at the documentation here.

git clone git@github.com:pvillard31/nifi-cloudrun-example.git
cd nifi-cloudrun-example
gcloud auth configure-docker
# docker build -t [HOSTNAME]/[PROJECT-ID]/[IMAGE]:[TAG] .
docker build -t eu.gcr.io/my-gcp-project/minifi-csvjson:0.0.1 .
# docker push [HOSTNAME]/[PROJECT-ID]/[IMAGE]:[TAG] .
docker push eu.gcr.io/my-gcp-project/minifi-csvjson:0.0.1

Deploy in Cloud Run

Once your Docker image is pushed in the Container Registry, you can go to the Cloud Run service and deploy your service!

Click on Create Service and fill the information:

Create a service in Cloud Run

Once your service is deployed, you will get a service HTTPS URL exposing your service:

I can now directly send requests to the exposed service:

$ time curl --data-binary "@test.csv" -H "Content-Type: text/csv" -X POST https://minifi-csvjson-t546z2l6aq-uc.a.run.app
[ {
  "name" : "Pierre Villard",
  "company" : "Google"
} ]
real	0m0.725s
user	0m0.036s
sys	0m0.026s

And I can also monitor my service in Stackdriver:

Conclusion

That’s it! You know how to deploy a new service in Cloud Run using the features of NiFi to develop your workflow and using the MiNiFi Docker image for running your containers.

Hopefully this demonstrates how easy it is to create and deploy a service and everything is managed for you in terms of scaling and lifecycle… and you’re paying only for what you really use!

In combination with the NiFi Registry and all the tools you have on the Google Cloud Platform you can have a very neat pipeline to automate all of it and deploy new services in seconds!

As always, feel free to ask questions and/or leave a comment.

NiFi 1.8+ – Revolutionizing the List/Fetch pattern and more…

If you read my post about List/Fetch pattern and if you’re using this approach for some of your workflows, this new feature coming with NiFi 1.8.0 is going to be a revolution.

A quick recap about the context: in NiFi, unless you specifically do something to make the nodes of a NiFi cluster exchange data, a flow file will remain on the same node from the beginning to the end of the workflow. For some use cases, it is necessary to load-balance the flow files among the nodes of the cluster to take advantage of all the nodes (like the List/Fetch pattern).

The only way to load balance data in a NiFi cluster before NiFi 1.8 is to use the Site-to-Site (S2S) protocol in NiFi with a Remote Process Group (RPG) connected to the cluster itself with an input/output port. In addition to that the S2S forces you to have the input/output port defined at the root level of the canvas.  In a multi-tenant environment this can be a bit annoying and can make the workflows a little bit more complex.

What’s the revolution with NiFi 1.8+? For intra-cluster load balancing of the flow files, you now can do it directly by configuring it on the relationship between two processors. It could sound like a minor thing, but that’s HUGE! In addition to that, you have few options to configure the load-balancing which opens up new possibilities for new use cases!

The List/Fetch pattern is described in my previous post: in short… it’s the action to use a first processor (ListX) only running on the primary node to list the available data on X and generate one flow file per object to retrieve on X (the flow file does not have any content but contains the metadata to be used to fetch the object), then flow files are distributed among the NiFi nodes and then the FetchX processor running on all nodes will take care of actually retrieving the data. This way you ensure there is no concurrent access to the same object and you distribute the work in your cluster.

List/Fetch pattern before NiFi 1.8.0

If we have a project A retrieving data from a FTP server using the List/Fetch pattern to push the data into HDFS, it’d look like this:

  • Root Process Group level

Screen Shot 2018-10-18 at 10.03.23 PM.png

  • Inside the Process Group dedicated to Project A

Screen Shot 2018-10-18 at 10.04.01 PM.png

The ListFTP is running on the primary node and sends the data to the RPG which load balances the flow files among the nodes. Flow files are pushed to the input port at the root level and the data can then be moved back down to the process group of the project. Then the FetchFTP actually retrieves the data and the data is sent to HDFS.

List/Fetch pattern with NiFi 1.8+

Now… it looks like this:

  • Root Process Group level

Screen Shot 2018-10-18 at 10.09.29 PM

  • Inside the Process Group dedicated to Project A

Screen Shot 2018-10-18 at 10.10.30 PM

It’s crazy, no? You don’t have anything outside of the process group anymore, the workflow is cleaner/simpler, and authorizations are much easier to manage.

Where is the trick? Did you notice the small icon on the relationship between the ListFTP and the FetchFTP? It looks small but it’s HUGE :).

Load balancing strategies in NiFi 1.8+

Let’s have a look at the configuration of a connection:

Screen Shot 2018-10-18 at 10.18.24 PM.png

There is a new parameter available: the load balance strategy. By default it defaults to “do not load balance” and, unless you need to, you won’t change that parameter (you don’t want to move data between your nodes at each step of the workflow if there is no reason to do so).

Here are the available strategies:

Screen Shot 2018-10-18 at 10.20.24 PM

The Round robin strategy is the one you would probably use in a List/Fetch use case. It will ensure your data is evenly balanced between your nodes.

The Single node strategy allows you to send the data back to one single node. You can see it a bit like a reducer in a MapReduce job: you process the data on all the nodes and then you want to perform a step on a single node. One example could be: I have a zip file containing hundreds of files, I unzip the file on one node, load balance all the flow files (using Round Robin strategy for example) among the nodes, process the files on all the nodes and then send back the flow files to a single node to compress back the data into a single file. It could look like this:

Screen Shot 2018-10-18 at 10.31.08 PM.png

Then you have the Partition by attribute strategy allowing you to have all the flow files sharing the same value for an attribute to be sent on the same node. For example, let’s say you receive data on NiFi behind a load balancer, you might want to have all the data coming from a given group of sources on the same node.

I won’t go into much more details, but feel free to have a look at the documentation. Besides, I’m sure other members of the Apache NiFi community will publish blog posts on this subject… such as this excellent one!

Let’s just give it a try with each strategy… I’m using a GenerateFlowFile (GFF) connected to an UpdateAttribute and we will list the flow files queuing in the relationship to check where the flow files are located. Besides, the GenerateFlowFile is configured to set a random integer between 2 and 4 for the attribute ‘filename’.

Let’s start with a GFF running on the primary node only with no load balancing strategy:

Screen Shot 2018-10-18 at 10.44.06 PM.png

My primary node being my node2 (I have node2, node3 and node4 in my cluster):

Screen Shot 2018-10-18 at 10.45.05 PM.png

I can confirm all the flow files are on the primary node:

Screen Shot 2018-10-18 at 10.46.37 PM.png

Let’s change the Load Balance strategy to Round Robin. We can confirm the data is evenly distributed:

Screen Shot 2018-10-18 at 10.50.02 PM

Let’s change the strategy to One single node. We can confirm the data is now back to a single node (not necessarily the primary node):

Screen Shot 2018-10-18 at 10.53.22 PM.png

And now let’s try the partitioning by attribute using the ‘filename’ attribute. We can confirm that all the flow files sharing the same value in the Filename column are on the same node:

Screen Shot 2018-10-18 at 10.55.21 PM.png

Screen Shot 2018-10-18 at 10.56.12 PM.png

Again, I expect to see additional blog posts on this subject with more technical insights on how it actually works and what you should consider in case your cluster is scaling up and down.

Also… this new feature comes with another one which is as much exciting: the offloading of nodes in a cluster (look at the Admin guide in the documentation). In case you want to decommission a node in a cluster, this will take care of getting back the data on the other nodes before you actually remove the node for good. This will be particularly useful when deploying NiFi on Kubernetes and scaling up and down your cluster!

As always, feel free to comment and/or ask questions! Thanks for reading!

Monitoring Driven Development with NiFi 1.7+

I already discussed the Monitoring Driven Development (MDD) approach in this post and already covered some monitoring topics around NiFi in few posts (reading the posts would probably help you for reading this one – or at least refresh your memory on some concepts). In this post I want to present once again the MDD approach with some new features of NiFi 1.7 making things a bit easier and cleaner.

What do I mean when talking about MDD in NiFi? When I say MDD, I refer to the fact that, when a team is developing a “core workflow” for a use case, the team should also develop a “monitoring workflow” that will be in charge of monitoring and reporting the health of the “core workflow” based on their business requirements.

How does this work in NiFi? This approach completely relies on two things:

  • A proper naming convention for the components used in the workflows
  • The use of the Site-to-Site Reporting Tasks (see here)

Basically, the idea is to setup the S2S reporting tasks to send data to an input port dedicated to monitoring, then basic routing and filtering is performed assuming projects are following the naming convention so that all the data related to a given “core workflow” is automatically routed to the associated “monitoring workflow”. This way, it’s up to each project to implement the monitoring they want for their use case based on the requirements and SLA they have.

What’s new in NiFi that’s going to help me with MDD?

There are quite few interesting JIRAs but the two most important ones are:

  • [NIFI-4814] – Add distinctive attribute to S2S reporting tasks (NiFi 1.6.0)
  • [NIFI-5122] – Add record writer to S2S Reporting Tasks (NiFi 1.7.0)

Example

For my example, the naming convention is the following: any component that should be monitored should be named with a prefix ABC_ where ABC is the three letters abbreviation representing the use case/project.

I have two projects, one named PJA and one named PJB.

Let’s say the project A is using HandleHttpRequest/Response to receive XML data over HTTP and has two monitoring requirements:

  • Send an email to the team in case a bulletin is generated
  • Monitor in Grafana the amount of valid/invalid data being processed

Let’s say the project B is using ListenTCPRecord for a use case around logs ingestion and has one monitoring requirement:

  • Send an email to the team in case back pressure is enabled

The development of the workflows is outside the scope of this post. I’ll keep things simple as the objective is to demonstrate the MDD approach, not how to implement the workflows of the specific use cases I chose.

A *very basic* implementation of the “core workflow” of PJA could be:

Screen Shot 2018-08-29 at 3.55.19 PM.png

A *very basic* implementation of the “core workflow” of PJB could be:

Screen Shot 2018-08-29 at 4.36.29 PM

Now I’m going to configure the S2S Reporting Tasks – one for Controller Status data and one for Bulletins data. Note that there is a new feature allowing users to select a Record Writer for the generated data. I’m going to use the Avro one for the reporting tasks so that I don’t have to care about the schemas at all. For that I need to create a Controller Service (in Controller Settings menu since this controller service is exclusively used by reporting tasks).

Screen Shot 2018-08-29 at 5.02.26 PM.png

After enabling the CS, I can configure my reporting tasks.

The one for bulletins:

Screen Shot 2018-08-29 at 5.03.59 PM

And the one for controller status:

Screen Shot 2018-08-29 at 5.05.21 PM

Note that you have additional S2S Reporting Tasks if needed. The one about provenance events can be particularly useful for some monitoring requirements such as latency.

At the root level of my canvas, I’ve something looking like:

Screen Shot 2018-08-29 at 5.45.38 PM.png

I’m receiving all the monitoring data sent by the reporting tasks in the ‘monitoring’ input port. Then in my ‘Monitoring’ process group, I have something looking like:

Screen Shot 2018-08-29 at 5.46.48 PM.png

You can notice I’m sending all the data to a ‘Technical Monitoring’. That’s because, the team in charge of administrating/monitoring the platform probably wants to also have a look at the monitoring data to ensure the full system is performing OK (common examples: send the bulletins to an external tool like ES/Kibana, send email alerts in case back pressure is enabled on a connection, etc).

On the upper part of the workflow, that’s where I’m processing the monitoring data to split it and route the monitoring data by project. First I’m using a RouteOnAttribute to route the data based on the type (bulletins, controller status, etc):

Screen Shot 2018-08-29 at 5.50.48 PM

Then I’m using an UpdateRecord processor to add a ‘project’ field that I’m computing based on the component name.

For bulletins:

Screen Shot 2018-08-29 at 5.55.34 PM

For controller status:

Screen Shot 2018-08-29 at 5.52.43 PM.png

Since the data has been sent in Avro format by the reporting task, I’m using an Avro reader Controller Service with default settings. And, from now on, I want the data to be in JSON (as it’ll be easier in case I want to send the data via an email or via HTTP). I’m configuring few controller services: one Avro Schema Registry containing the schemas (that you can retrieve in the additional details of the reporting task documentation, see below), one JSON Reader and one JSON Writer.

For instance, to get the schema of the S2S Bulletin Reporting Task, go on the reporting task list:

Screen Shot 2018-08-29 at 5.59.40 PM.png

Click on the documentation icon on the left:

Screen Shot 2018-08-29 at 6.00.21 PM.png

And then click Additional Details:

Screen Shot 2018-08-29 at 6.00.58 PM.png

When adding the schemas in the Avro Schema Registry, use the type of the reporting task as the name of the schema and don’t forget to add a ‘project’ field in the schemas:

Screen Shot 2018-08-29 at 6.02.28 PM.png

Now you can easily reference the schemas to use in the configuration of the JSON Reader/Writer using the expression language because the flow files generated by the reporting tasks have an attribute with the type of the source reporting task.

Example of attributes on a flow file generated by a RT:

Screen Shot 2018-08-29 at 6.28.56 PM.png

JSON Reader:

Screen Shot 2018-08-29 at 6.03.44 PM.png

JSON Writer:

Screen Shot 2018-08-29 at 6.04.13 PM.png

Now I’ve a ‘project’ field in my data with the project name (if the naming convention has been correctly used), then you can use a QueryRecord processor to route the data for each project:

Screen Shot 2018-08-29 at 6.07.17 PM.png

And we can confirm that only the data for project A is routed to the ‘Monitoring Workflow’ for project A. If looking at the data in the relationship routing data for PJA:

Screen Shot 2018-08-29 at 6.08.53 PM.png

Now, when deploying a new project for the first time (ie importing the process group of the “core workflow”), you just need to update the QueryRecord processor to add a dynamic property for the new project as well as deploying the process group for the “monitoring workflow” of the project. In combination with the NiFi Registry it makes things really smooth when you need to update the workflows with a new version.

I’m not going to develop the monitoring workflows for PJA and PJB but it’d only be a matter of few record processors to filter the data (bulletin? back pressure enabled?), and then use a processor to send the data (ElasticSearch, PutEmail, InvokeHttp, etc):

  • “send an email when a bulletin is generated” – RouteOnAttribute to get the bulletins data and PutEmail to send an email
  • “monitor the number of valid/invalid XML files being processed” – filter the controller status statistics on the ‘Connection’ elements and keep only the statistics of the connections after the ValidateRecord processor, this can be easily done with a QueryRecord processor. The numbers can then be sent to an external system such as ElasticSearch for a dashboard in Kibana/Grafana
  • “send an email when back pressure is enabled” – use a QueryRecord processor on the controller status data by querying the field ‘isBackPressureEnabled’ and then use PutEmail processor.

The full workflow of this example is available as a gist here. Note that, since it’s a template, it does not contain the Reporting Tasks configuration. But you should be up and running quite quickly with the above screenshots.

As always, thanks for reading this post, feel free to comment and/or ask questions. In case you have suggestions to make things even better, just let me know and, maybe, I’ll have to publish a new version of this post to present the new features of a next release 😉

NiFi 1.7+ – Terminate threads

One of the new features coming with NiFi 1.7.0 is the possibility to terminate stuck threads directly from the UI. Before this release, when you had a processor getting stuck (like a custom processor with a deadlock) you had no option but to restart NiFi… and that’s not really great in a multi-tenant setup.

Let’s see this new feature with an example: I’m using a GenerateFlowFile and, using the debug mode of my IDE, I’m going to simulate an issue with the thread by adding a break point in the onTrigger() method of the processor.

Screen Shot 2018-07-02 at 2.56.57 PM.png

When the processor is running, we can see the number of threads currently used by the processor (top right). If the number of tasks is not increasing (and equal to 0 after 5 minutes) and the number of threads is constant, it probably means you have a stuck thread. In this case, it is always recommended to do a thread dump of NiFi to see what is going on. To do that:

./bin/nifi.sh dump /tmp/thread-dump.txt

If we look at the content of the generated file and looking for my GenerateFlowFile processor, we can see something like:

"Timer-Driven Process Thread-5" Id=57 RUNNABLE (suspended)
 at org.apache.nifi.processors.standard.GenerateFlowFile.onTrigger(GenerateFlowFile.java:210)
 at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
 at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1165)
 at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:203)
 at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
 at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
 at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
 at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745)
 Number of Locked Synchronizers: 1
 - java.util.concurrent.ThreadPoolExecutor$Worker@2388307

In this case, the thread is shown as “suspended” at the line where I put my break point in my IDE.

Note that having a stuck thread is usually a symptom of a badly designed processor or an underlying issue with the code dependencies of the processor. Having the thread dump can help locating and fixing the issue.

If trying to stop the processor:

Screen Shot 2018-07-02 at 3.02.23 PM.png

I’ll now see two threads being used by the processor:

Screen Shot 2018-07-02 at 3.03.09 PM

The processor is now in the process of being stopped but you won’t be able to update or restart the processor until it is actually stopped. However, if the initial thread is stuck, you’ll remain in this state forever (and, eventually, you’ll have to restart NiFi if you’re running a version below 1.7.0).

With NiFi 1.7.0, you now have the possibility to terminate the thread (meaning you don’t have to wait for the thread to be completely stopped):

Screen Shot 2018-07-02 at 3.06.58 PM

If I terminate the thread, here is what I’ll see:

Screen Shot 2018-07-02 at 3.08.07 PM

Meaning there is no more active thread and there is one thread being terminated.

Under the hood, the framework is issuing an “interrupt” for the thread and will perform a “reload” of the processor meaning there is a new instance of the processor class being created. This allows the old class to eventually shut down gracefully, close connections, etc. However this needs to be used with care: if the class is maintaining some values in internal variables, this information will be lost in the process (I’m not talking about state information that can be saved at framework level by the processors).

An interrupt is an indication to a thread that it should stop what it is doing and do something else. It’s up to the programmer to decide exactly how a thread responds to an interrupt, but it is very common for the thread to terminate. However if, for some reasons, the thread does not respond to interrupt, then you will keep the thread as being terminated forever (in my above example, I can see the thread staying in the state of being terminated because of my break point).

You can now update the configuration and start again the processor (even if a thread is being terminated). Once the thread is terminated, you’ll get back to a nominal situation:

Screen Shot 2018-07-02 at 4.54.51 PM.png

If you don’t want to setup a debug environment, you can simulate a stuck thread with the following ExecuteScript processor:

Screen Shot 2018-07-02 at 4.56.49 PM.png

If starting/stopping the processor:

Screen Shot 2018-07-02 at 6.02.37 PM.png

And I’ll get the following message when terminating the thread:

Screen Shot 2018-07-02 at 6.02.58 PM.png

You also can use the DebugFlow processor with “@OnStopped Pause Time” set to something like “5 min” and “Ignore Interrupts When Paused” set to “true”:

Screen Shot 2018-07-02 at 6.49.29 PM.png

That’s it for this post! This new feature is really nice in a multi-tenant environment where you can’t afford a restart of the service. If you’re in a situation where you need to use this feature, remember to take a thread dump before actually terminating the thread. This will be really useful to investigate the issue. Also, remember that terminating a thread is not a “normal” operation, it means there is something wrong somewhere and it could very likely happen again.

Thanks for reading this post and, as always, feel free to comment / ask questions!

NiFi 1.7+ – XML Reader/Writer and ForkRecord processor

Starting with NiFi 1.7.0 and thanks to the work done by Johannes Peter on NIFI-4185 and NIFI-5113, it’s now possible to use an XML reader and writer in the Record processors to help you processing XML data. Before that, you had few options requiring a bit of additional work to get things working (see here).

I won’t go into the details because the reader/writer are really well documented (have a look at the additional details for examples):

Also, with NIFI-4227, a ForkRecord processor is now available to “explode” your data when you have one or multiple embedded arrays in your data set and you need to normalize your data. Have a look at the ForkRecord documentation (additional details) for examples.

Example – Introduction

Let’s go through an example using the XML Reader and the ForkRecord processor. I assume I’m receiving XML data with the following schema:

Screen Shot 2018-06-27 at 10.39.20 AM.png

And here is a dummy file I’m receiving that I’ll use for this example:

Screen Shot 2018-06-27 at 2.15.28 PM.png

The corresponding Avro schema can be found here. Defining the Avro schema corresponding to your data is the most “difficult” part but once it’s done, everything else is really easy with the Record processors. You can get some help using the InferAvroSchema processor but this should only be used to get an initial version of your Avro schema when you start developing your workflows (it should not be used as a processor used in your production workflows).

I’m going to quickly explain the following workflow (template available here):

Screen Shot 2018-06-27 at 2.24.43 PM.png

Example – XML to JSON conversion

The GenerateFlowFile is used to generate my XML data and to send the content to the Record processors I’m using. The upper part of the workflow is just a ConvertRecord processor to perform the XML to JSON conversion thanks to the schema. The other parts of the workflows are used to extract each “object” of my schema (customer, address, account and transaction).

Here are the configuration details for the XML to JSON conversion. The GenerateFlowFile is very simple:

Screen Shot 2018-06-28 at 2.25.23 PM

Note the attribute ‘schema.name’ set with the value of the schema I added in my Avro schema registry controller service:

Screen Shot 2018-06-28 at 2.26.24 PM.png

The XML Reader is configured to get the schema name from the ‘schema.name’ attribute of the processed flow files:

Screen Shot 2018-06-28 at 2.27.21 PM.png

The JSON writer does not contain any specific configuration, it’ll write the data using the schema inherited at the reader level:

Screen Shot 2018-06-28 at 2.28.22 PM

The ConvertRecord processor can now be configured for the XML to JSON conversion:

Screen Shot 2018-06-28 at 2.29.20 PM.png

That’s it! You now have a very easy and powerful way to perform your XML to JSON conversion. Here the flow file content before the processor:

Screen Shot 2018-06-27 at 2.15.28 PM

Here is the JSON generated by the processor:

Screen Shot 2018-06-28 at 2.31.23 PM

If you need to manipulate your records to go from one schema to another, you can use the UpdateRecord processor.

Note – using the XML Reader/Writer and Record processors, you can expect performances as good as the best options presented in my previous post about XML data processing.

Example – ForkRecord processor

The goal here is to extract the data contained in the arrays into separated CSV files. From one XML file, I want to generate 4 CSV files that I could use, for instance, to send data into database tables.

Let’s start with the first one regarding customer data. I’m adding a ‘customer’ Avro schema in the Avro schema registry:

Screen Shot 2018-06-28 at 2.42.58 PM.png

I’m using an UpdateAttribute, to set the name of the ‘output.schema’ that I want to be used by the CSV Record Writer. Then, I’m just using a ConvertRecord processor to process from XML to JSON and I only keep the root level fields (customer data). Here is the configuration of my CSV writer:

Screen Shot 2018-06-28 at 2.45.23 PM.png

Here is the output when processing my XML data:

Screen Shot 2018-06-28 at 2.46.32 PM.png

Now, I want to extract data contained in arrays and that’s where the ForkRecord is useful. To use it, I need to specify a custom property containing the Record path to the array that the processor should process, then you have two modes for the processor:

  • Split – that will preserve the input schema but will create one flow file per element contained in the array.
  • Extract – that will generate flow files using the element of the arrays with the possibility to include the parent fields up to the root level in case you want to keep some fields as keys/identifiers of your arrays elements. Note that you can define which parent fields you want to preserve by setting the appropriate schema with only the fields you want.

In our case we’re using the Extract mode and we are including the parent fields: for the ‘address’ data for instance, we want to keep the customer_id field in the output (that field would be used as a key between the customer data and the address data in the database world). Here is the schema I define in my Avro schema registry for the ‘address’ object:

Screen Shot 2018-06-28 at 2.52.03 PM

I’m using an UpdateAttribute to set the attributes I’m using in my reader and the ForkRecord processor:

Screen Shot 2018-06-28 at 2.55.04 PM

And I’m configuring the ForkRecord processor to use the ‘fork.path’ attribute defining the array of elements to be processed:

Screen Shot 2018-06-28 at 2.55.59 PM.png

Note – if you have multiple Record paths pointing to multiple arrays with elements using the same output schema, you can define as many custom properties as you want.

And here is the output after the ForkRecord processor:

Screen Shot 2018-06-28 at 2.56.40 PM

Here are the Record paths I’m setting for the other type of objects I want to extract:

  • Account – fork.path = /accounts/account
  • Transaction – fork.path = /accounts/account[*]/transactions/transaction
    In this case, I need to specify account[*] because I want to access all the transactions of all the elements in the account array. But I could use the Record path features to only access some specific accounts based on some predicates.

And here the output I get (after defining the appropriate schema in my schema registry):

  • Account data (I only keep the customer_id from the parent fields):

Screen Shot 2018-06-28 at 3.02.14 PM.png

  • Transaction data (I only keep the customer_id and account_id from the parent fields):

Screen Shot 2018-06-28 at 3.03.55 PM

Conclusion

The XML Reader & Writer are a really nice addition to NiFi if you need to process XML data and it will make your existing workflows much simpler! You really have to look at the Record processors and concepts as soon as you’ve a use case manipulating data complying to a schema.

The ForkRecord processor is a new processor that you can use if you need to manipulate schema oriented data containing a lot of arrays. That can be useful in case you need to normalize data before ingestion into databases.

Thanks for reading this post and, as always, feel free to comment and/or ask questions!

NiFi workflow monitoring – Wait/Notify pattern with split and merge

Thanks to NIFI-4262 and NIFI-5293, NiFi 1.7.0 contains a small improvement allowing users to extend the Wait/Notify pattern to merging situations.

If you’re not familiar with the Wait/Notify concept in NiFi, I strongly recommend you to read this great post from Koji about the Wait/Notify pattern (it’ll be much easier to understand this post).

When using the Merge* processors, you have one relationship to route the flow file containing the merged data and one relationship to route the original flow files used in the merging operation. With NIFI-4262 for the MergeContent processor (and NIFI-5293 for the MergeRecord processor), the original flow files used in the merge operation will now contain an attribute with the UUID of the flow file generated by the merge operation. This way, you now have a way to link things together and that’s really useful to leverage the Wait/Notify pattern.

Use case presentation

Let’s demonstrate the feature with a use case: I’m receiving ZIP files containing multiple CSV files that I want to merge together while converting the data into Avro and send it into a file system (for simplicity here, I’ll send the data to my local file system, but it could be HDFS for instance).

The idea is the following:

ListFile -> FetchFile -> UnpackContent -> MergeRecord -> PutFile

That’s easy and it works great… BUT… I want to be able to monitor and track exactly how each ZIP file is handled and when the data contained in a ZIP file is stored in the destination. I have a requirement to maintain in real-time the following tables:

Table ‘zip_files_processing’

  • zip_file_name – unique name of the received ZIP files
  • ff_uuid – flow file UUID (useful to replay events in case of errors)
  • ingestion_date – date when the zip file starts being processing in NiFi
  • storage_date – date when all the data of the ZIP file has been processed
  • number_files – number of CSV files contained in the ZIP file
  • number_files_OK – number of files successfully stored in the destination
  • number_files_KO – number of files unsuccessfully processed
  • status – current status of the process for the ZIP file (IN_PROGRESS, OK, KO)

Table ‘files_ingestion’

  • zip_file_name – unique name of the zip file containing the CSV file
  • file_name – name of the CSV file
  • ff_uuid – flow file UUID (useful to replay events in case of errors)
  • storage_date – date when the CSV file has been processed
  • storage_name – name of the destination file containing the data
  • status – current status of the process for the CSV file (IN_PROGRESS, OK, KO)

That’s where the Wait/Notify is going to be extremely useful.

High level description of the workflow

Here is the main idea behind the workflow:

Screen Shot 2018-06-13 at 4.14.17 PM.png

Let’s describe the workflow at a high level:

  • List the data available and insert rows in my zip_files_processing table for each listed file so that I can track ingested data
  • Fetch the data (in case of error, I update the zip_files_processing table to set the status to KO)
  • I unpack my ZIP files to get a flow file per CSV file, and I route the original flow file to a Wait processor. The flow file representing the ZIP file will be released only once all the associated CSV files will be fully processed
  • For each CSV file, I insert a row in my files_ingestion table to track the status of each CSV file being processed
  • I merge the data together while converting the data from CSV to Avro, and I route the original flow files to a Wait processor. The flow files will be released only once the merged flow file will be sent to the final destination
  • I send the merged flow file to the destination (local file system in this case) and then use the Notify processor to release all the original flow files used to create the merged flow file
  • Then I use the released flow files to notify the first Wait processor and also release the flow file representing the corresponding ZIP file
  • With all the released flow files, I can update my monitoring tables with the appropriate information

Workflow details

Here is the full configuration of the workflow and some explanations around the parameters I used. Also, here is the template of the workflow I used – it’ll be easier to understand if you have a look, give it a try and play around with it (even without the SQL processors) – note that you will need to add a Distributed Map Cache server controller service to have the Distributed Map Cache clients working with Wait/Notify processors:

Screen Shot 2018-06-26 at 1.50.35 PM.png

Let’s start describing the ingestion part:

Screen Shot 2018-06-26 at 1.52.57 PM.png

Nothing very strange here. Just note the UpdateAttribute processor I’m using to store the original UUID to a dedicated attribute so that I don’t loose it with the UnpackContent processor (that creates new flow files with new UUIDs).

Screen Shot 2018-06-26 at 1.56.13 PM

The first PutSQL is used to insert new lines in my ZIP monitoring table for each ZIP file I’m listing. I’m executing the below query:

Screen Shot 2018-06-26 at 1.55.31 PM

The second PutSQL is used in case of error when fetching the file from the local file system or when an error occurs while unpacking the archive. In such a case, I’m executing the below query:

Screen Shot 2018-06-26 at 1.57.24 PM.png

Let’s now focus on the second part of the workflow:

Screen Shot 2018-06-26 at 1.58.28 PM

The UnpackContent processor is used to extract the CSV files from the ZIP file. For one flow file representing the ZIP file, it will generate one flow file per CSV files contained in the ZIP file. The original flow file will then be routed in the original relationship while flow files for CSV data will be routed in the success relationship. The flow files containing the CSV data will have a ‘fragment.identifier’ attribute and that’s the common attribute between the original flow file and the generated ones that I’m going to use for Wait/Notify (this attribute is automatically generated by the processor). Besides the original flow file also has a ‘fragment.count’ attribute containing the number of generated flow files for the CSV data.

The Wait processor is configured as below:

Screen Shot 2018-06-26 at 2.03.46 PM

Basically I’m setting the identifier of the release signal to the common attribute shared between the original and the generated flow files, and I’m saying that I have to wait for ‘fragment.count’ signal before releasing the original flow file to the success relationship. Until then, the flow file is transferred to the ‘wait’ relationship which is looping back on the Wait processor. Also, I configured a 10 minutes timeout in case I didn’t received the expected signals, and in this case the flow file would be routed to ‘expired’ relationship.

After my Wait processor, in case of error, I’m using an UpdateAttribute to retain this error:

Screen Shot 2018-06-26 at 2.08.50 PM.png

And after that, I’m using the PutSQL processor to update the ZIP monitoring table by executing the below query:

Screen Shot 2018-06-26 at 2.09.46 PM.png

The ‘wait.*’ attributes are generated by the Wait processor when releasing the flow file and contain the number of signals sent by the Notify processor (I’ll come back to that when describing the corresponding Notify processor). In this case, it allows me to know how many files contained in the ZIP file have been successfully processed and how many have not been successfully processed.

After the UnpackContent and the success relationship, I’m using a PutSQL to insert lines in my CSV monitoring table by executing the below query:

Screen Shot 2018-06-26 at 2.19.41 PM

The ‘segment.original.filename’ is the name of the ZIP file from which the CSV files have been extracted.

Let’s move to the final part of the workflow (it’s a bit loaded because I wanted to have everything in my screenshot):

Screen Shot 2018-06-26 at 2.27.15 PM.png

The MergeRecord is used to merge the data together while converting from CSV to Avro:

Screen Shot 2018-06-26 at 2.28.11 PM

In a real world use case, you would configure the processor to merge much more data together… but for the below demonstration I’m merging few flow files together so that I can quickly see the results.

Also I’m generating dummy data like:

Screen Shot 2018-06-26 at 2.29.41 PM.png

And the corresponding schema I’m using in the Avro Schema Registry controller service is:

Screen Shot 2018-06-26 at 2.30.55 PM.png

After the MergeRecord processor, there is the ‘original’ relationship where are routed the flow files used during the merge operation, and there is the ‘merged’ relationship where are routed the flow files containing the merged data. As explained above, the original flow files are containing a ‘merge.uuid’ attribute with the UUID of the merged flow file. That’s the attribute I’m using in the Wait processor configuration:

Screen Shot 2018-06-26 at 2.35.49 PM

Basically, I’m creating a signal identifier in the distributed cache with the ‘merge.uuid’ processor and as long as the signal counter is greater than 1, I’m releasing one flow file at a time. You’ll see in my Notify configuration (coming back to that in a bit) that, once my merged data is processed, I’m using the Notify processor to set the signal counter associated to the identifier with a value equal to the number of merged flow files. This way, once there is the notification, all the corresponding flow files will be released one by one.

I won’t describe the PutFile processor which is used to store my data (it could be a completely different processor), but just notice the UpdatAttribute I’m using in case of failure:

Screen Shot 2018-06-26 at 2.40.47 PM.png

And the UpdateAttribute I’m using in case of success:

Screen Shot 2018-06-26 at 2.41.24 PM

Note that I’m setting attributes with the same prefix ‘wn_ingestion.*’. Then, look at my Notify processor:

Screen Shot 2018-06-26 at 3.13.05 PM.png

The ‘uuid’ attribute is equal to the ‘merge.uuid’ I mentioned above for the original flow files used in the merge operation. Besides, I’m using the Attribute Cache Regew property to copy the corresponding attributes to all the flow files going to be released thanks to this signal. It means that when a merged flow file is sent to the Notify processor, all the original flow files with the corresponding in the ‘merge.uuid’ in the ‘wait’ relationship will be released and will get the ‘wn*’ attributes of the merged flow file. That’s how I’m “forwarding” the information about when the merged flow file has been sent to the destination, if the merged flow file has been successfully processed or not, and what is the filename used to save the file in the destination.

Once the original flow files are released from the Wait processor, they are sent to the Notify processor with the below configuration:

Screen Shot 2018-06-26 at 3.18.24 PM.png

In this case, I’m using the attribute inherited from the merged flow file sent to the destination to determine if the processing has been successful or not. If yes, then I’m setting the counter name to ‘OK’, otherwise it’s set to ‘KO’. That’s how I’m getting the information about how many CSV files from a given ZIP have been successfully processed or not. (remember the ‘wait.counter.OK’ and ‘wait.counter.KO’ attributes created when the flow file corresponding to the ZIP file is released?).

And now… I can execute my SQL query to update my CSV monitoring table with the latest information:

Screen Shot 2018-06-26 at 3.22.09 PM

Demonstration

Let’s see how the workflow is working and what’s the result in all kind of situations. I’m running a Postgres instance in a Docker container to store my monitoring data. Here are the statements I used to create the two tables.

CREATE TABLE IF NOT EXISTS zip_files_processing (
 zip_file_name varchar(255),
 ff_uuid varchar(255),
 ingestion_date timestamp DEFAULT current_timestamp,
 storage_date timestamp,
 number_files integer,
 number_files_OK integer,
 number_files_KO integer,
 status varchar(15)
);
CREATE TABLE IF NOT EXISTS files_ingestion (
 zip_file_name varchar(255),
 file_name varchar(255),
 ff_uuid varchar(255),
 storage_date timestamp,
 storage_name varchar(255),
 status varchar(15)
);

Case 1 – nominal situation

In this case, all the received data is OK and all is working flawlessly… Let’s say I’m receiving two ZIP files. Once the data is received, I can see something like:

Screen Shot 2018-06-15 at 11.53.58 PM

 

Then the ZIP files are unpacked, and I can see my CSV files in the other table:

Screen Shot 2018-06-15 at 11.54.42 PM

Then the data is merged and sent to the destination. At the end, my tables looks like:

Screen Shot 2018-06-15 at 11.56.28 PM.png

I can see that the data coming from the two ZIP files has been merged into the same final file and everything is OK. In the ZIP monitoring table, I have:

Screen Shot 2018-06-15 at 11.57.55 PM

Case 2 – cannot fetch ZIP file

In this case, the FetchFile processor is failing due to some permission issues. Here is what I get in my ZIP monitoring table:

Screen Shot 2018-06-16 at 12.04.35 AM

Note that I could a ‘comment’ column with the error cause.

Case 3 – cannot unpack ZIP file

Result will be identical to case 2 as I’m handling the situation in the same way.

Case 4 – cannot merge records because a CSV file is not schema valid

I’m processing 2 ZIP files but a CSV file contained in one of the ZIP file cannot be merged with the others because it does not respect the schema. Note that all the flow files used during a merge operation will be sent to the failure relationship if one of the flow file is not valid. That’s why multiple flow files will be routed to the failure relationship even though there is only one bad file. To avoid this situation, it would be possible to add an additional step by using the ValidateRecord processor.

For my ZIP files, I have:

Screen Shot 2018-06-22 at 8.42.00 PM

At the end, for this run, I have:

 

Screen Shot 2018-06-22 at 8.44.01 PM.png

Case 5 – cannot send a merged file to destination

In this case I’m simulating an error when sending a merged flow file to the destination. Here is the result at the end for the ZIP table:

Screen Shot 2018-06-22 at 9.57.09 PM

And the CSV files table:

Screen Shot 2018-06-22 at 9.57.44 PM

Conclusion

I’ve covered a non exhaustive list of situations and the workflow could be improved but it gives you a good idea on how the Wait/Notify pattern can be used in a NiFi workflow to help monitoring what’s going on when dealing with split / merge situations. In addition to that, it’d easy to add an automatic mechanism to try replaying failed flow files using the UUID and the provenance repository replay feature of NiFi.

Thanks for reading this post and, as always, feel free to comment and/or ask questions!