Apache NiFi 1.0.0 – Cluster setup

As you may know a version 1.0.0-BETA of Apache NiFi has been released few days ago. The upcoming 1.0.0 release will be a great moment for the community as it it will mark a lot of work over the last few months with many new features being added.

The objective of the Beta release is to give people a chance to try this new version and to give a feedback before the official major release which will come shortly. If you want to preview this new version with a completely new look, you can download the binaries here, unzip it, and run it (‘./bin/nifi.sh start‘ or ‘./bin/run-nifi.bat‘ for Windows), then you just have to access http://localhost:8080/nifi/.

The objective of this post is to briefly explain how to setup an unsecured NiFi cluster with this new version (a post for setting up a secured cluster will come shortly with explanations on how to use a new tool that will be shipped with NiFi to ease the installation of a secured cluster).

One really important change with this new version is the new paradigm around cluster installation. From the NiFi documentation, we can read:

Starting with the NiFi 1.0 release, NiFi employs a Zero-Master Clustering paradigm. Each of the nodes in a NiFi cluster performs the same tasks on the data but each operates on a different set of data. Apache ZooKeeper elects one of the nodes as the Cluster Coordinator, and failover is handled automatically by ZooKeeper. All cluster nodes report heartbeat and status information to the Cluster Coordinator. The Cluster Coordinator is responsible for disconnecting and connecting nodes. As a DataFlow manager, you can interact with the NiFi cluster through the UI of any node in the cluster. Any change you make is replicated to all nodes in the cluster, allowing for multiple entry points to the cluster.

zero-master-cluster

OK, let’s start with the installation. As you may know it is greatly recommended to use an odd number of ZooKeeper instances with at least 3 nodes (to maintain a majority also called quorum). NiFi comes with an embedded instance of ZooKeeper, but you are free to use an existing cluster of ZooKeeper instances if you want. In this article, we will use the embedded ZooKeeper option.

I will use my computer as the first instance. I also launched two virtual machines (with a minimal Centos 7). All my 3 instances are able to communicate to each other on requested ports. On each machine, I configure my /etc/hosts file with:

192.168.1.17 node-3
192.168.56.101 node-2
192.168.56.102 node-1

I deploy the binaries file on my three instances and unzip it. I now have a NiFi directory on each one of my nodes.

The first thing is to configure the list of the ZK (ZooKeeper) instances in the configuration file ‘./conf/zookeeper.properties‘. Since our three NiFi instances will run the embedded ZK instance, I just have to complete the file with the following properties:

server.1=node-1:2888:3888
server.2=node-2:2888:3888
server.3=node-3:2888:3888

Then, everything happens in the ‘./conf/nifi.properties‘. First, I specify that NiFi must run an embedded ZK instance, with the following property:

nifi.state.management.embedded.zookeeper.start=true

I also specify the ZK connect string:

nifi.zookeeper.connect.string=node-1:2181,node-2:2181,node-3:2181

As you can notice, the ./conf/zookeeper.properties file has a property named dataDir. By default, this value is set to ./state/zookeeper. If more than one NiFi node is running an embedded ZK, it is important to tell the server which one it is.

To do that, you need to create a file name myid and placing it in ZK’s data directory. The content of this file should be the index of the server as previously specify by the server. property.

On node-1, I’ll do:

mkdir ./state
mkdir ./state/zookeeper
echo 1 > ./state/zookeeper/myid

The same operation needs to be done on each node (don’t forget to change the ID).

If you don’t do this, you may see the following kind of exceptions in the logs:

Caused by: java.lang.IllegalArgumentException: ./state/zookeeper/myid file is missing

Then we go to clustering properties. For this article, we are setting up an unsecured cluster, so we must keep:

nifi.cluster.protocol.is.secure=false

Then, we have the following properties:

nifi.cluster.is.node=true
nifi.cluster.node.address=node-1
nifi.cluster.node.protocol.port=9999
nifi.cluster.node.protocol.threads=10
nifi.cluster.node.event.history.size=25
nifi.cluster.node.connection.timeout=5 sec
nifi.cluster.node.read.timeout=5 sec
nifi.cluster.firewall.file=

I set the FQDN of the node I am configuring, and I choose the arbitrary 9999 port for the communication with the elected cluster coordinator. I apply the same configuration on my other nodes:

nifi.cluster.is.node=true
nifi.cluster.node.address=node-2
nifi.cluster.node.protocol.port=9999
nifi.cluster.node.protocol.threads=10
nifi.cluster.node.event.history.size=25
nifi.cluster.node.connection.timeout=5 sec
nifi.cluster.node.read.timeout=5 sec
nifi.cluster.firewall.file=

and

nifi.cluster.is.node=true
nifi.cluster.node.address=node-3
nifi.cluster.node.protocol.port=9999
nifi.cluster.node.protocol.threads=10
nifi.cluster.node.event.history.size=25
nifi.cluster.node.connection.timeout=5 sec
nifi.cluster.node.read.timeout=5 sec
nifi.cluster.firewall.file=

We have configured the exchanges between the nodes and the cluster coordinator, now let’s move to the exchanges between the nodes (to balance the data of the flows). We have the following properties:

nifi.remote.input.host=node-1
nifi.remote.input.secure=false
nifi.remote.input.socket.port=9998
nifi.remote.input.http.enabled=true
nifi.remote.input.http.transaction.ttl=30 sec

Again, I set the FQDN of the node I am configuring and I choose the arbitrary 9998 port for the Site-to-Site (S2S) exchanges between the nodes of my cluster. The same applies for all the nodes (just change the host property with the correct FQDN).

It is also important to set the FQDN for the web server property, otherwise we may get strange behaviors with all nodes identified as ‘localhost’ in the UI. Consequently, for each node, set the following property with the correct FQDN:

nifi.web.http.host=node-1

And that’s all! Easy, isn’t it?

OK, let’s start our nodes and let’s tail the logs to see what’s going on there!

./bin/nifi.sh start && tail -f ./logs/nifi-app.log

If you look at the logs, you should see that one of the node gets elected as the cluster coordinator and then you should see heartbeats created by the three nodes and sent to the cluster coordinator every 5 seconds.

You can connect to the UI using the node you want (you can have multiple users connected to different nodes, modifications will be applied on each node). Let’s go to:

http://node-2:8080/nifi

Here is what it looks like:

Screen Shot 2016-08-13 at 7.33.08 PM

As you can see in the top-left corner, there are 3 nodes in our cluster. Besides, if we go in the menu (button in the top-right corner) and select the cluster page, we have details on our three nodes:

Screen Shot 2016-08-13 at 7.35.28 PM

We see that my node-2 has been elected as cluster coordinator, and that my node-3 is my primary node. This distinction is important because some processors must run on a unique node (for data consistency) and in this case we will want it to run “On primary node” (example below).

We can display details on a specific node (“information” icon on the left):

Screen Shot 2016-08-13 at 7.35.48 PM

OK, let’s add a processor like GetTwitter. Since the flow will run on all nodes (with balanced data between the nodes), this processor must run on a unique processor if we don’t want to duplicate data. Then, in the scheduling strategy, we will choose the strategy “On primary node”. This way, we don’t duplicate data, and if the primary node changes (because my node dies or gets disconnected), we won’t loose data, the workflow will still be executed.

Screen Shot 2016-08-13 at 7.45.19 PM

Then I can connect my processor to a PutFile processor to save the tweets in JSON by setting a local directory (/tmp/twitter):

Screen Shot 2016-08-13 at 7.52.25 PM

If I run this flow, all my JSON tweets will be stored on the primary node, the data won’t be balanced. To balance the data, I need to use a RPG (Remote Process Group), the RPG will connect to the coordinator to evaluate the load of each node and balance the data over the nodes. It gives us the following flow:

Screen Shot 2016-08-13 at 8.00.26 PM

I have added an input port called “RPG”, then I have added a Remote Process Group that I connected to ” http://node-2:8080/nifi ” and I enabled transmission so that the Remote Process Group was aware of the existing input ports on my cluster. Then in the Remote Process Group configuration, I enabled the RPG input port. I then connected my GetTwitter to the Remote Process Group and selected the RPG input port. Finally, I connected my RPG input port to my PutFile processor.

When running the flow, I now have balanced data all over my nodes (I can check in the local directory ‘/tmp/twitter‘ on each node).

That’s all for this post. I hope you enjoyed it and that it will be helpful for you if setting up a NiFi cluster. All comments/remarks are very welcomed and I kindly encourage you to download Apache NiFi, to try it and to give a feedback to the community if you have any.

35 thoughts on “Apache NiFi 1.0.0 – Cluster setup

  1. This is an awesome tutorial. Thank you so much Pierre.
    I need clarification.
    So I tried by shutting down 2 nodes and running on a single node in a cluster. Seems like it doesn’t work. Meaning the zookpeer would not know which to elect as a co-coordinator and a primary. Is that so? When I checked the logs i keep getting unable to connect to zookeeper exception.

    Like

    • I am facing the same problem ==> when using the Embeded ZK, if one node shuts down, the cluster is unable to elect the new coordinator.
      Pretty odd … did you find a way to fix the issue?

      Like

    • I am facing the same issue when I use embedded zookeeper. Is there any solution to this ?
      (If one server goes down, all other goes down)

      Like

      • I’d recommend asking this question on the apache nifi users mailing list (but I believe this is related to the fact that you only have 2 ZK nodes left and the election might not complete successfully – you don’t have an uneven number of ZK nodes). However, please note that for a NiFi production cluster, using embedded ZK is not recommended.

        Like

  2. Thanks Pierre, Quick question regarding the RPG. It looks like you are only specifying the current coordinator. What happens if zookeeper changes the current coordinator to another node? will the RPG be smart enough to still balance across all nodes?

    Like

    • Hi David. Once the Site-to-Site communication is established, the client will know about other nodes of the cluster. Even if the coordinator/endpoint goes down, the client will update its status and the cluster topology by talking to other nodes. A client also caches the remote peers info and even persist it to a local file so that it can be loaded when the client node restarts.

      Like

  3. any suggestion to set up using ec2 for each node, seperate machine as 1 node. so when the node coordinator will acknowledge others machine (cluster) when nifi.sh start. Thank you

    Like

    • Thank you great article, please assist as same question ‘cloud-user’ addressed. If i want to stand up 3 ec2 on the fly and zoo keeper will know how to map these ips ? Instead of stand up the 3 machines first and then get the static ips and map it into /etc/hosts. as you describe above /thanks

      Like

  4. Great tutorial, one feedback could you please state that for Site-to-Site one should be using the hostname/IP of every single machine (I presumed that you had to point the remote address on all machines to node1)

    Like

  5. Thanks you, how would you using three ec2 instance, and using zookeeper embedded in each machine to determine the cluster.? So look like you in your sample, you have to spin up the 3 virtual machine first then map the ips into /etc/hosts right? Thanks

    Like

  6. Hello: how would you implement if using three different ec2 machine (3 ips) and zookeeper in tack. In the above article, you have 3 ips map into /etc/hosts. So is that 3 ips obtains after you start the server or you set up to grab on the fly thanks

    Like

    • Hi kludnext, in case of ec2 instances, I believe you have public FQDN and a DNS that allows each node to resolve each other. In this case, no need to change /etc/hosts.

      Like

  7. Thank you for great article, question regarding to spin up 3 different machine, each one will have a node, and they connect via zookeeper. Does each instance needs to have the embedded zookeeper run ?

    Like

  8. Hey, I did all the steps that you mentioned here. But the zookeeper was unable to elect a leader adn getting the following error message.

    2018-04-19 15:56:55,209 ERROR [Curator-Framework-0] o.a.c.f.imps.CuratorFrameworkImpl Background operation retry gave up
    org.apache.zookeeper.KeeperException$ConnectionLossException: KeeperErrorCode = ConnectionLoss
    at org.apache.zookeeper.KeeperException.create(KeeperException.java:99)
    at org.apache.curator.framework.imps.CuratorFrameworkImpl.checkBackgroundRetry(CuratorFrameworkImpl.java:728)
    at org.apache.curator.framework.imps.CuratorFrameworkImpl.performBackgroundOperation(CuratorFrameworkImpl.java:857)
    at org.apache.curator.framework.imps.CuratorFrameworkImpl.backgroundOperationsLoop(CuratorFrameworkImpl.java:809)
    at org.apache.curator.framework.imps.CuratorFrameworkImpl.access$300(CuratorFrameworkImpl.java:64)
    at org.apache.curator.framework.imps.CuratorFrameworkImpl$4.call(CuratorFrameworkImpl.java:267)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

    Like

      • Hey, Thanks for the reply. I forgot to disable the iptables, since i didnt open any ports. Its working now.

        Like

  9. Hi, I followed you tutorial, it was all very clear and explanatory. I am running a 3 node cluster on the google cloud platform, each node on a separate vm. I can start nifi in each instance, and a node is elected, and the heartbeats come in as expected, but in the UI i get “an unexpected error has occured please check the logs for additional details” I can’t see *anything* in the logs that would explain this…

    Like

  10. this is very nice blog for nifi cluster. but i am getting below error while running nifi on any of the 3 server.

    ERROR [node-1/x.x.x.x:3888] o.a.z.server.quorum.QuorumCnxManager Exception while listening
    java.net.BindException: Cannot assign requested address (Bind failed)

    ERROR [node-2/x.x.x.x:3888] o.a.z.server.quorum.QuorumCnxManager Exception while listening
    java.net.BindException: Cannot assign requested address (Bind failed)

    ERROR [node-3/x.x.x.x:3888] o.a.z.server.quorum.QuorumCnxManager Exception while listening
    java.net.BindException: Cannot assign requested address (Bind failed)

    Like

  11. ok i have resolved the port issue, now it is throwing below error.

    ERROR [Curator-Framework-0] o.a.c.f.imps.CuratorFrameworkImpl Background retry gave up
    org.apache.curator.CuratorConnectionLossException: KeeperErrorCode = ConnectionLoss

    Like

  12. Hi , Thanks for this wonderful post.
    I have created 2 nifi instances on Google Cloud Platform. I now plan to bring them onto a cluster. However I am getting this error

    server.quorum.QuorumPeerConfig does not have the form host:port or host:port:port or host:port:port:type

    I have checked my zoo keeper properties file again and again but not sure whats the issue. My zoo keeper properties file is as follows . I am not sure as to why this issue is cropping up. My nifi servers are by the name nifi-server2 and nifi-server1 and ports are 2888 and 3888

    server.1=nifi-server2:2888:3888

    server.2=nifi-server1:2888:3888

    # server.3=nifi-node3-hostname:2888:3888

    Like

    • Ok I guess I fixed this. The zookeeper.properties file had a last line which actually should be deleted and that causes the issue.

      Like

Leave a Reply to Apache NiFi 1.1.0 – Secured cluster setup – Pierre Villard Cancel reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

This site uses Akismet to reduce spam. Learn how your comment data is processed.