NiFi 1.8+ – Revolutionizing the List/Fetch pattern and more…

If you read my post about List/Fetch pattern and if you’re using this approach for some of your workflows, this new feature coming with NiFi 1.8.0 is going to be a revolution.

A quick recap about the context: in NiFi, unless you specifically do something to make the nodes of a NiFi cluster exchange data, a flow file will remain on the same node from the beginning to the end of the workflow. For some use cases, it is necessary to load-balance the flow files among the nodes of the cluster to take advantage of all the nodes (like the List/Fetch pattern).

The only way to load balance data in a NiFi cluster before NiFi 1.8 is to use the Site-to-Site (S2S) protocol in NiFi with a Remote Process Group (RPG) connected to the cluster itself with an input/output port. In addition to that the S2S forces you to have the input/output port defined at the root level of the canvas.  In a multi-tenant environment this can be a bit annoying and can make the workflows a little bit more complex.

What’s the revolution with NiFi 1.8+? For intra-cluster load balancing of the flow files, you now can do it directly by configuring it on the relationship between two processors. It could sound like a minor thing, but that’s HUGE! In addition to that, you have few options to configure the load-balancing which opens up new possibilities for new use cases!

The List/Fetch pattern is described in my previous post: in short… it’s the action to use a first processor (ListX) only running on the primary node to list the available data on X and generate one flow file per object to retrieve on X (the flow file does not have any content but contains the metadata to be used to fetch the object), then flow files are distributed among the NiFi nodes and then the FetchX processor running on all nodes will take care of actually retrieving the data. This way you ensure there is no concurrent access to the same object and you distribute the work in your cluster.

List/Fetch pattern before NiFi 1.8.0

If we have a project A retrieving data from a FTP server using the List/Fetch pattern to push the data into HDFS, it’d look like this:

  • Root Process Group level

Screen Shot 2018-10-18 at 10.03.23 PM.png

  • Inside the Process Group dedicated to Project A

Screen Shot 2018-10-18 at 10.04.01 PM.png

The ListFTP is running on the primary node and sends the data to the RPG which load balances the flow files among the nodes. Flow files are pushed to the input port at the root level and the data can then be moved back down to the process group of the project. Then the FetchFTP actually retrieves the data and the data is sent to HDFS.

List/Fetch pattern with NiFi 1.8+

Now… it looks like this:

  • Root Process Group level

Screen Shot 2018-10-18 at 10.09.29 PM

  • Inside the Process Group dedicated to Project A

Screen Shot 2018-10-18 at 10.10.30 PM

It’s crazy, no? You don’t have anything outside of the process group anymore, the workflow is cleaner/simpler, and authorizations are much easier to manage.

Where is the trick? Did you notice the small icon on the relationship between the ListFTP and the FetchFTP? It looks small but it’s HUGE :).

Load balancing strategies in NiFi 1.8+

Let’s have a look at the configuration of a connection:

Screen Shot 2018-10-18 at 10.18.24 PM.png

There is a new parameter available: the load balance strategy. By default it defaults to “do not load balance” and, unless you need to, you won’t change that parameter (you don’t want to move data between your nodes at each step of the workflow if there is no reason to do so).

Here are the available strategies:

Screen Shot 2018-10-18 at 10.20.24 PM

The Round robin strategy is the one you would probably use in a List/Fetch use case. It will ensure your data is evenly balanced between your nodes.

The Single node strategy allows you to send the data back to one single node. You can see it a bit like a reducer in a MapReduce job: you process the data on all the nodes and then you want to perform a step on a single node. One example could be: I have a zip file containing hundreds of files, I unzip the file on one node, load balance all the flow files (using Round Robin strategy for example) among the nodes, process the files on all the nodes and then send back the flow files to a single node to compress back the data into a single file. It could look like this:

Screen Shot 2018-10-18 at 10.31.08 PM.png

Then you have the Partition by attribute strategy allowing you to have all the flow files sharing the same value for an attribute to be sent on the same node. For example, let’s say you receive data on NiFi behind a load balancer, you might want to have all the data coming from a given group of sources on the same node.

I won’t go into much more details, but feel free to have a look at the documentation. Besides, I’m sure other members of the Apache NiFi community will publish blog posts on this subject… such as this excellent one!

Let’s just give it a try with each strategy… I’m using a GenerateFlowFile (GFF) connected to an UpdateAttribute and we will list the flow files queuing in the relationship to check where the flow files are located. Besides, the GenerateFlowFile is configured to set a random integer between 2 and 4 for the attribute ‘filename’.

Let’s start with a GFF running on the primary node only with no load balancing strategy:

Screen Shot 2018-10-18 at 10.44.06 PM.png

My primary node being my node2 (I have node2, node3 and node4 in my cluster):

Screen Shot 2018-10-18 at 10.45.05 PM.png

I can confirm all the flow files are on the primary node:

Screen Shot 2018-10-18 at 10.46.37 PM.png

Let’s change the Load Balance strategy to Round Robin. We can confirm the data is evenly distributed:

Screen Shot 2018-10-18 at 10.50.02 PM

Let’s change the strategy to One single node. We can confirm the data is now back to a single node (not necessarily the primary node):

Screen Shot 2018-10-18 at 10.53.22 PM.png

And now let’s try the partitioning by attribute using the ‘filename’ attribute. We can confirm that all the flow files sharing the same value in the Filename column are on the same node:

Screen Shot 2018-10-18 at 10.55.21 PM.png

Screen Shot 2018-10-18 at 10.56.12 PM.png

Again, I expect to see additional blog posts on this subject with more technical insights on how it actually works and what you should consider in case your cluster is scaling up and down.

Also… this new feature comes with another one which is as much exciting: the offloading of nodes in a cluster (look at the Admin guide in the documentation). In case you want to decommission a node in a cluster, this will take care of getting back the data on the other nodes before you actually remove the node for good. This will be particularly useful when deploying NiFi on Kubernetes and scaling up and down your cluster!

As always, feel free to comment and/or ask questions! Thanks for reading!

NiFi 1.7+ – Terminate threads

One of the new features coming with NiFi 1.7.0 is the possibility to terminate stuck threads directly from the UI. Before this release, when you had a processor getting stuck (like a custom processor with a deadlock) you had no option but to restart NiFi… and that’s not really great in a multi-tenant setup.

Let’s see this new feature with an example: I’m using a GenerateFlowFile and, using the debug mode of my IDE, I’m going to simulate an issue with the thread by adding a break point in the onTrigger() method of the processor.

Screen Shot 2018-07-02 at 2.56.57 PM.png

When the processor is running, we can see the number of threads currently used by the processor (top right). If the number of tasks is not increasing (and equal to 0 after 5 minutes) and the number of threads is constant, it probably means you have a stuck thread. In this case, it is always recommended to do a thread dump of NiFi to see what is going on. To do that:

./bin/nifi.sh dump /tmp/thread-dump.txt

If we look at the content of the generated file and looking for my GenerateFlowFile processor, we can see something like:

"Timer-Driven Process Thread-5" Id=57 RUNNABLE (suspended)
 at org.apache.nifi.processors.standard.GenerateFlowFile.onTrigger(GenerateFlowFile.java:210)
 at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
 at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1165)
 at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:203)
 at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
 at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
 at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
 at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745)
 Number of Locked Synchronizers: 1
 - java.util.concurrent.ThreadPoolExecutor$Worker@2388307

In this case, the thread is shown as “suspended” at the line where I put my break point in my IDE.

Note that having a stuck thread is usually a symptom of a badly designed processor or an underlying issue with the code dependencies of the processor. Having the thread dump can help locating and fixing the issue.

If trying to stop the processor:

Screen Shot 2018-07-02 at 3.02.23 PM.png

I’ll now see two threads being used by the processor:

Screen Shot 2018-07-02 at 3.03.09 PM

The processor is now in the process of being stopped but you won’t be able to update or restart the processor until it is actually stopped. However, if the initial thread is stuck, you’ll remain in this state forever (and, eventually, you’ll have to restart NiFi if you’re running a version below 1.7.0).

With NiFi 1.7.0, you now have the possibility to terminate the thread (meaning you don’t have to wait for the thread to be completely stopped):

Screen Shot 2018-07-02 at 3.06.58 PM

If I terminate the thread, here is what I’ll see:

Screen Shot 2018-07-02 at 3.08.07 PM

Meaning there is no more active thread and there is one thread being terminated.

Under the hood, the framework is issuing an “interrupt” for the thread and will perform a “reload” of the processor meaning there is a new instance of the processor class being created. This allows the old class to eventually shut down gracefully, close connections, etc. However this needs to be used with care: if the class is maintaining some values in internal variables, this information will be lost in the process (I’m not talking about state information that can be saved at framework level by the processors).

An interrupt is an indication to a thread that it should stop what it is doing and do something else. It’s up to the programmer to decide exactly how a thread responds to an interrupt, but it is very common for the thread to terminate. However if, for some reasons, the thread does not respond to interrupt, then you will keep the thread as being terminated forever (in my above example, I can see the thread staying in the state of being terminated because of my break point).

You can now update the configuration and start again the processor (even if a thread is being terminated). Once the thread is terminated, you’ll get back to a nominal situation:

Screen Shot 2018-07-02 at 4.54.51 PM.png

If you don’t want to setup a debug environment, you can simulate a stuck thread with the following ExecuteScript processor:

Screen Shot 2018-07-02 at 4.56.49 PM.png

If starting/stopping the processor:

Screen Shot 2018-07-02 at 6.02.37 PM.png

And I’ll get the following message when terminating the thread:

Screen Shot 2018-07-02 at 6.02.58 PM.png

You also can use the DebugFlow processor with “@OnStopped Pause Time” set to something like “5 min” and “Ignore Interrupts When Paused” set to “true”:

Screen Shot 2018-07-02 at 6.49.29 PM.png

That’s it for this post! This new feature is really nice in a multi-tenant environment where you can’t afford a restart of the service. If you’re in a situation where you need to use this feature, remember to take a thread dump before actually terminating the thread. This will be really useful to investigate the issue. Also, remember that terminating a thread is not a “normal” operation, it means there is something wrong somewhere and it could very likely happen again.

Thanks for reading this post and, as always, feel free to comment / ask questions!