NiFi 1.8+ – Revolutionizing the List/Fetch pattern and more…

If you read my post about List/Fetch pattern and if you’re using this approach for some of your workflows, this new feature coming with NiFi 1.8.0 is going to be a revolution.

A quick recap about the context: in NiFi, unless you specifically do something to make the nodes of a NiFi cluster exchange data, a flow file will remain on the same node from the beginning to the end of the workflow. For some use cases, it is necessary to load-balance the flow files among the nodes of the cluster to take advantage of all the nodes (like the List/Fetch pattern).

The only way to load balance data in a NiFi cluster before NiFi 1.8 is to use the Site-to-Site (S2S) protocol in NiFi with a Remote Process Group (RPG) connected to the cluster itself with an input/output port. In addition to that the S2S forces you to have the input/output port defined at the root level of the canvas.  In a multi-tenant environment this can be a bit annoying and can make the workflows a little bit more complex.

What’s the revolution with NiFi 1.8+? For intra-cluster load balancing of the flow files, you now can do it directly by configuring it on the relationship between two processors. It could sound like a minor thing, but that’s HUGE! In addition to that, you have few options to configure the load-balancing which opens up new possibilities for new use cases!

The List/Fetch pattern is described in my previous post: in short… it’s the action to use a first processor (ListX) only running on the primary node to list the available data on X and generate one flow file per object to retrieve on X (the flow file does not have any content but contains the metadata to be used to fetch the object), then flow files are distributed among the NiFi nodes and then the FetchX processor running on all nodes will take care of actually retrieving the data. This way you ensure there is no concurrent access to the same object and you distribute the work in your cluster.

List/Fetch pattern before NiFi 1.8.0

If we have a project A retrieving data from a FTP server using the List/Fetch pattern to push the data into HDFS, it’d look like this:

  • Root Process Group level

Screen Shot 2018-10-18 at 10.03.23 PM.png

  • Inside the Process Group dedicated to Project A

Screen Shot 2018-10-18 at 10.04.01 PM.png

The ListFTP is running on the primary node and sends the data to the RPG which load balances the flow files among the nodes. Flow files are pushed to the input port at the root level and the data can then be moved back down to the process group of the project. Then the FetchFTP actually retrieves the data and the data is sent to HDFS.

List/Fetch pattern with NiFi 1.8+

Now… it looks like this:

  • Root Process Group level

Screen Shot 2018-10-18 at 10.09.29 PM

  • Inside the Process Group dedicated to Project A

Screen Shot 2018-10-18 at 10.10.30 PM

It’s crazy, no? You don’t have anything outside of the process group anymore, the workflow is cleaner/simpler, and authorizations are much easier to manage.

Where is the trick? Did you notice the small icon on the relationship between the ListFTP and the FetchFTP? It looks small but it’s HUGE :).

Load balancing strategies in NiFi 1.8+

Let’s have a look at the configuration of a connection:

Screen Shot 2018-10-18 at 10.18.24 PM.png

There is a new parameter available: the load balance strategy. By default it defaults to “do not load balance” and, unless you need to, you won’t change that parameter (you don’t want to move data between your nodes at each step of the workflow if there is no reason to do so).

Here are the available strategies:

Screen Shot 2018-10-18 at 10.20.24 PM

The Round robin strategy is the one you would probably use in a List/Fetch use case. It will ensure your data is evenly balanced between your nodes.

The Single node strategy allows you to send the data back to one single node. You can see it a bit like a reducer in a MapReduce job: you process the data on all the nodes and then you want to perform a step on a single node. One example could be: I have a zip file containing hundreds of files, I unzip the file on one node, load balance all the flow files (using Round Robin strategy for example) among the nodes, process the files on all the nodes and then send back the flow files to a single node to compress back the data into a single file. It could look like this:

Screen Shot 2018-10-18 at 10.31.08 PM.png

Then you have the Partition by attribute strategy allowing you to have all the flow files sharing the same value for an attribute to be sent on the same node. For example, let’s say you receive data on NiFi behind a load balancer, you might want to have all the data coming from a given group of sources on the same node.

I won’t go into much more details, but feel free to have a look at the documentation. Besides, I’m sure other members of the Apache NiFi community will publish blog posts on this subject… such as this excellent one!

Let’s just give it a try with each strategy… I’m using a GenerateFlowFile (GFF) connected to an UpdateAttribute and we will list the flow files queuing in the relationship to check where the flow files are located. Besides, the GenerateFlowFile is configured to set a random integer between 2 and 4 for the attribute ‘filename’.

Let’s start with a GFF running on the primary node only with no load balancing strategy:

Screen Shot 2018-10-18 at 10.44.06 PM.png

My primary node being my node2 (I have node2, node3 and node4 in my cluster):

Screen Shot 2018-10-18 at 10.45.05 PM.png

I can confirm all the flow files are on the primary node:

Screen Shot 2018-10-18 at 10.46.37 PM.png

Let’s change the Load Balance strategy to Round Robin. We can confirm the data is evenly distributed:

Screen Shot 2018-10-18 at 10.50.02 PM

Let’s change the strategy to One single node. We can confirm the data is now back to a single node (not necessarily the primary node):

Screen Shot 2018-10-18 at 10.53.22 PM.png

And now let’s try the partitioning by attribute using the ‘filename’ attribute. We can confirm that all the flow files sharing the same value in the Filename column are on the same node:

Screen Shot 2018-10-18 at 10.55.21 PM.png

Screen Shot 2018-10-18 at 10.56.12 PM.png

Again, I expect to see additional blog posts on this subject with more technical insights on how it actually works and what you should consider in case your cluster is scaling up and down.

Also… this new feature comes with another one which is as much exciting: the offloading of nodes in a cluster (look at the Admin guide in the documentation). In case you want to decommission a node in a cluster, this will take care of getting back the data on the other nodes before you actually remove the node for good. This will be particularly useful when deploying NiFi on Kubernetes and scaling up and down your cluster!

As always, feel free to comment and/or ask questions! Thanks for reading!

16 thoughts on “NiFi 1.8+ – Revolutionizing the List/Fetch pattern and more…

  1. Hi Pierre, first of all thank you so much for your postings in general, they are very helpful! Two short questions to NiFi 1.8.0. Nifi has some new properties, I’ve listed them below. Can you explain what “nifi.cluster.load.balance.connections.per.node” does? We have a 8 node cluster, should we work with the default value?

    nifi.properties:
    # cluster load balancing properties #
    nifi.cluster.load.balance.host=
    nifi.cluster.load.balance.port=6342
    nifi.cluster.load.balance.connections.per.node=4
    nifi.cluster.load.balance.max.thread.count=8
    nifi.cluster.load.balance.comms.timeout=30 sec

    Second question, in the authorizers.xml is a new property called “Node Group” . We tried to to set a text there, but the secured cluster doesn’t start anymore. What is the meaning of it and where do we have to define this group to get nifi up and running?

    authorizers.xml:

    Thanks. Josef

    Like

    • Hi Josef, thanks for your comment.

      The number of threads allocated to load balancing is per node and should be sized based on how much you’re going to use that feature (in terms of how many flow files and the size of the flow files). I’d work with the defaults for now.

      Regarding the “Node Group”, it makes sense if the NiFi nodes are known in your LDAP/AD and belong to a group. It’s useful to define policies at group level for the nodes so that policies are automatically applied if you scale down or up your cluster (granted you added the nodes in the LDAP group before adding the node to the cluster).

      Pierre

      Like

  2. Thank you! The load balancing is working now much better than the previous version! And of course much simpler to configure it 🙂

    Like

  3. Hi Pierre, Thank you so much for the excellent document! But I have a Question.. I am ingesting the Data From Oracle to PostgresSql Using NiFi.

    We have huge data and continuously generating from sources, I want to updated records and delete records using NiFi(Historical Data)

    My Flow is: QueryDatabaseRecord–>UpdateAttribute–>PutDatabaseRecord

    when matched record found on the final table then define which action you need to take either Update (or) Delete
    if the record not matched in the final dataset then insert the record.
    Appreciate your help.

    Like

      • Hi Pierre, I am importing the data from Oracle to PostgreSQL and most of the time I am working on update and deletes. here I have two questions …
        1) If I am updating the records with modified_date using sysdate I am getting the records, but the thing If I am updating the records with sysdate -1 I am not getting the records how will I get the records less than date update records using NiFi?

        2) If there are deletes happens in the source how will affect the target? I need to delete the records in target also and the delete records store into some other table in the target for backup how will I achieve the delete records using NiFi?

        Like

  4. Hi Pierre, I have installed NiFi in 3 nodes cluster with kerberos. Now I am accessing NiFi UI by proxy configuration i.e., with the IP address and port I can see the login page of NiFi but as soon as I login to NiFi it redirects me to some other url i.e, the host which is the ip address gets replaced or redirected to some other host. Now again if I replace the url with IP address it works but if I click to some other path it again redirects to some other url.
    I am pretty sure it can be fix by using nifi.web.proxy.host and nifi.web.proxy.context.path but I don’t know how to use it. I referred to NiFi admins guide but still I am not getting what should I give the value on those parameters.
    Can you tell me what excatly how to use those parameters and what will be the value of those.

    Thanks

    Like

  5. Hi, it cool but this feature has the issue with NiFi registry. NiFi detects it as changes but an update on another group do not set this strategy.
    FYI

    Like

  6. Hi Pvillard,Thanks for such a great article.I am trying to load balance data on Nifi 1.8 3 node cluster, but i am getting this error:2019-06-25 07:45:40,486 ERROR [Load-Balanced Client Thread-3] o.a.n.c.q.c.c.a.n.NioAsyncLoadBalanceClient Unable to connect to bisnorsdaiapn03.THREEDEV.LOCAL:9443 for load balancing
    java.net.ConnectException: Connection timed out. Could you please guide me what is the reason for it?

    Like

  7. yes the port is opened,i have given port 6342 for in cluster load balance settings but no idea why is it using 9443 which is ui port. Do i need to set access policy in UI for every node? nifi.properties settings are as below:cluster node properties (only configure for cluster nodes) #
    nifi.cluster.is.node=true
    nifi.cluster.node.address=
    nifi.cluster.node.protocol.port=11443
    nifi.cluster.node.protocol.threads=10
    nifi.cluster.node.protocol.max.threads=50
    nifi.cluster.node.event.history.size=25
    nifi.cluster.node.connection.timeout=60 sec
    nifi.cluster.node.read.timeout=60 sec
    nifi.cluster.node.max.concurrent.requests=400
    nifi.cluster.firewall.file=
    nifi.cluster.flow.election.max.wait.time=5 mins
    nifi.cluster.flow.election.max.candidates=3

    # cluster load balancing properties #
    nifi.cluster.load.balance.host=
    nifi.cluster.load.balance.port=6342
    nifi.cluster.load.balance.connections.per.node=4
    nifi.cluster.load.balance.max.thread.count=8
    nifi.cluster.load.balance.comms.timeout=360 sec ##increased this from 30 secs to 360

    Like

  8. Hi Pierre! Thank you for your explanation of List/Fetch technique, it really works! Could you also share any working example about TailFile technique? I have machine-generated log-files, separated on hourly base. Files updated every minute with new data, and when hour ends, it stops write to one file and generate a new file with next hour suffix in the name for next hour data, smth like “equipm-data-DDMMYYYY-HH24.log” List/Fetch technique is working for my case, but with 1 hour lag in data delivery. All my attempts to change strategy and to implement TailFile in Multiple File mode (using RegExp) was without success. Thank you in advance!

    Like

    • Yes, I think the TailFile processor expects that the file where the logs are appended (say myfile.log) is constant and then rolled out to a different file name (say myfile.log-DDMMYYYY-HH24) while logs keeps being appended to myfile.log. I’d have to check again how the processors works for your situation. Otherwise can’t you change the mechanism in charge of the logging? (I know it might be more difficult in terms of change management)

      Like

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

This site uses Akismet to reduce spam. Learn how your comment data is processed.