Scaling up/down a NiFi cluster

Scaling up – add a new node in the cluster

You have a NiFi cluster and you are willing to increase the throughput by adding a new node? Here is a way to do it without restarting the cluster. This post starts with a 3-nodes secured cluster with embedded ZooKeeper and we are going to add a new node to this cluster (and this new node won’t have an embedded ZK).

The node I’m going to add is a CentOS 7 virtual machine running an Apache NiFi 1.1.0 instance. All prerequisites are met (network, firewall, java, etc) and I uncompressed the NiFi and NiFi toolkit binaries archives.

First thing to do is, if you don’t have DNS resolution, to update the /etc/hosts file on all the nodes so that each node can resolve the others.

Then, I want to issue a certificate signed by my Certificate Authority for my new node. For this purpose, I use the NiFi toolkit and issue a CSR against my running CA server (see secured cluster setup). I created a directory /etc/nifi, and from this directory I ran the command:

.../bin/tls-toolkit.sh client -c node-3 -t myTokenToUseToPreventMITM -p 9999

node-3 being the node where my CA server is running.

It generates:

  • config.json
  • keystore.jks
  • nifi-cert.pem
  • truststore.jks

Then I can configure my new node with the following properties in ./conf/nifi.properties (to match the configuration used on the running cluster):

 

nifi.cluster.protocol.is.secure=true
nifi.cluster.is.node=true
nifi.cluster.node.address=node-4
nifi.cluster.node.protocol.port=9998
nifi.remote.input.host=node-4
nifi.remote.input.secure=true
nifi.remote.input.socket.port=9997
nifi.web.https.host=node-4
nifi.web.https.port=8443
nifi.zookeeper.connect.string=node-1:2181,node-2:2181,node-3:2181

And the following properties based on the information from the generated config.json file:

nifi.security.keystore=/etc/nifi/keystore.jks
nifi.security.keystoreType=jks
nifi.security.keystorePasswd=JBJUu5MGhGdBj6pTZkZaBR2jRijtcjEMlwqnqc9WkMk
nifi.security.keyPasswd=JBJUu5MGhGdBj6pTZkZaBR2jRijtcjEMlwqnqc9WkMk
nifi.security.truststore=/etc/nifi/truststore.jks
nifi.security.truststoreType=jks
nifi.security.truststorePasswd=gQ7Bw9kbkbr/HLZMxyIvq3jtMeTkoZ/0anD1ygbtSt0
nifi.security.needClientAuth=true

Then we need to configure the ./conf/authorizers.xml file to specify the initial admin identity and the nodes identities:

<authorizers>
    <authorizer>
        <identifier>file-provider</identifier>
        <class>org.apache.nifi.authorization.FileAuthorizer</class>
        <property name="Authorizations File">./conf/authorizations.xml</property>
        <property name="Users File">./conf/users.xml</property>
        <property name="Initial Admin Identity">CN=pvillard, OU=NIFI</property>
        <property name="Legacy Authorized Users File"></property>

        <property name="Node Identity 1">CN=node-1, OU=NIFI</property>
        <property name="Node Identity 2">CN=node-2, OU=NIFI</property>
        <property name="Node Identity 3">CN=node-3, OU=NIFI</property>
    </authorizer>
</authorizers>

IMPORTANT: the authorizers.xml file must match the file that is on the other nodes. We do not specify the Node Identity of the new node, this will be handled when the node is joining the cluster. Also, before starting your new node, delete the ./conf/flow.xml.gz file so that this new node will pick up the current flow definition from the running cluster.

At this point, you can start the new node:

./bin/nifi.sh start && tail -f ./logs/nifi-app.log

And you will be able to see in the cluster view in the NiFi UI that your new node has joined the cluster:

screen-shot-2016-11-29-at-9-19-37-pm

You now need to add the new user corresponding to the new node:

screen-shot-2016-11-29-at-9-20-44-pm

screen-shot-2016-11-29-at-9-21-20-pm

And then update the policies to grant proxy “user requests” write access to the new node.

screen-shot-2016-11-29-at-9-21-37-pm

Now I want to confirm that my new node is used in the workflow processing. I am going to create a simple workflow that generates flow files on primary node, distribute the load on all the nodes of the cluster and store the generated files in /tmp. To do that, I first need to grant me the rights to modify the root process group content:

Click on the key symbol on the canvas and grant yourself all the access.

screen-shot-2016-11-29-at-9-26-59-pm

Then in order to simplify the next tasks (grant permissions to nodes for site-to-site communications), I recommend you to create a group called “nodes” and to add all the nodes user inside this group:

screen-shot-2016-11-29-at-9-42-19-pm

Then I create my workflow with a Remote Process Group and an Input Port to ensure the load balancing:

screen-shot-2016-11-29-at-9-43-48-pm

And I also specify that my GenerateFlowFile processor is running on primary node only:

screen-shot-2016-11-29-at-9-44-05-pm

Besides, I add specific rights to the Input Port to allow site-to-site communication between nodes. To do that, select the Input Port, then click on the key symbol in the Operate Panel and grant “receive site-to-site data” to the “nodes” group.

I can now start my workflow and confirm that data is stored in /tmp on each one of my nodes. I now have a new node participating in the execution of the workflow by the NiFi cluster!

Scaling down – decommission a node of the cluster

OK… now we want to check how a node can be decommissioned. There is a lot of reasons to be in this situation but the most common ones are:

  • upgrade the cluster in a rolling fashion (decommission a node, upgrade the node, move back the node in the cluster and move on to the next one)
  • add a new processor NAR to the cluster (decommission a node, add NAR in the node’s library, restart NiFi on this node, move back the node in the cluster and move on to the next one)

To do that, you just need to go in the cluster view, disconnect your node, stop your node, perform your modifications and restart the node to get it back in the cluster. Coordinator and primary roles will be automatically assigned to a new node if needed.

But let’s say we are in more a “destructive” situation (removing a node for good) and we want to ensure that the data currently processed by the node we are decommissioning is correctly processed before shutting down NiFi on this node.For this purpose, I am going to change the end point of my workflow with a PutHDFS instead of a PutFile and send the data to an external cluster. The objective is to check that all the generated data is correctly going into HDFS without any issue.Here is my workflow:screen-shot-2016-11-30-at-9-16-54-amAnd here is the current status of my cluster:screen-shot-2016-11-30-at-9-18-29-amMy GenerateFlowFile is running on the primary node only (node-1) and I’ll generate a file every 100ms. I’ll decommission the node-1 to also demonstrate the change of roles.

As a note: when you define a remote process group, you need to specify one of the nodes of the remote cluster you want to send data to. This specific node is only used when you are starting the remote process group to connect to the remote cluster. Once done, the RPG will be aware of all the nodes of the remote cluster and will be able to send data to all the nodes even though the specified remote node is disconnected. In a short future, it’ll be possible to specify multiple remote nodes in order to be more resilient (in case the RPG is stopped/restarted and the specified remote node is no more available).

To decommission a node, I only need to click on the “disconnect” button on the cluster view for the node I want to disconnect.Before disconnecting my node, I can check that my workflow is running correctly and I have flow files queued in my relationships:screen-shot-2016-11-30-at-9-31-46-amI can now disconnect my node-1 and check what is going on. Roles have been reassigned and the workflow is still running:screen-shot-2016-11-30-at-9-33-43-amscreen-shot-2016-11-30-at-9-33-55-amNote that, as I explained, the remote process group is still working as expected even though it was using node-1 as remote node (see note above).I can now access the UI of the node I just disconnected and check the current status of this node (https://node-1:8443/nifi). When I access the node, I’ve the following warning:screen-shot-2016-11-30-at-9-35-15-amAnd I can see that the workflow is still running on this node. This way, I can wait until all flow files currently on this node are processed:screen-shot-2016-11-30-at-9-35-27-amMy GenerateFlowFile is not running anymore since my node lost its role of Primary Node, and we can wait for the end of the processing of all flow files on this node. Once all flow files are processed, we have the guarantee that there will be no data loss and we can now shutdown the node if needed.If you want to reconnect it once you have performed your updates, you just need to go on the UI (from a node in the cluster), go in the cluster view and reconnect the node:screen-shot-2016-11-30-at-9-33-43-amThe node is now reconnected and used in the workflow processing:screen-shot-2016-11-30-at-9-43-24-amI can check that all the files I’ve generated have been transferred, as expected, on my HDFS cluster.screen-shot-2016-11-30-at-9-45-25-amThat’s all for this blog! As you can see, scaling up and down your NiFi cluster is really easy and it gives you the opportunity to ensure no service interruption.As always, comments and suggestions are welcomed.

 

Apache NiFi 1.1.0 – Secured cluster setup

Apache NiFi 1.1.0 is now out, and I want to discuss a specific subject in a couple of posts: how to scale up and down a NiFi cluster without loosing data? Before going into this subject, I want to setup a 3-nodes secured cluster using the NiFi toolkit. It will be my starting point to scale up my cluster with an additional node, and then scale down my cluster.

There are already great posts describing how to setup a secured cluster using embedded ZK and taking advantage of the NiFi toolkit for the certificates, so I won’t go in too much details. For reference, here are some great articles I recommend:

OK… let’s move on. My initial setup is the following:

  • My OS X laptop
  • 2 CentOS 7 VM

All nodes have required prerequisites (network, java, etc), have the NiFi 1.1.0 binary archive uncompressed available, and have the NiFi toolkit 1.1.0 binary archive uncompressed available.

On my OS X laptop, I will use the NiFi TLS toolkit in server mode so that it acts as a Certificate Authority that can be used by clients to get Certificates.

Here is the description of how to use the TLS toolkit in server mode:

./bin/tls-toolkit.sh server

usage: org.apache.nifi.toolkit.tls.TlsToolkitMain [-a <arg>] [-c <arg>] [--configJsonIn <arg>] [-d <arg>] [-D <arg>] [-f <arg>] [-F] [-g] [-h] [-k <arg>] [-p
       <arg>] [-s <arg>] [-T <arg>] [-t <arg>]

Acts as a Certificate Authority that can be used by clients to get Certificates

 -a,--keyAlgorithm <arg>                   Algorithm to use for generated keys. (default: RSA)
 -c,--certificateAuthorityHostname <arg>   Hostname of NiFi Certificate Authority (default: localhost)
    --configJsonIn <arg>                   The place to read configuration info from (defaults to the value of configJson), implies useConfigJson if set.
                                           (default: configJson value)
 -d,--days <arg>                           Number of days issued certificate should be valid for. (default: 1095)
 -D,--dn <arg>                             The dn to use for the CA certificate (default: CN=YOUR_CA_HOSTNAME,OU=NIFI)
 -f,--configJson <arg>                     The place to write configuration info (default: config.json)
 -F,--useConfigJson                        Flag specifying that all configuration is read from configJson to facilitate automated use (otherwise configJson will
                                           only be written to.
 -g,--differentKeyAndKeystorePasswords     Use different generated password for the key and the keyStore.
 -h,--help                                 Print help and exit.
 -k,--keySize <arg>                        Number of bits for generated keys. (default: 2048)
 -p,--PORT <arg>                           The port for the Certificate Authority to listen on (default: 8443)
 -s,--signingAlgorithm <arg>               Algorithm to use for signing certificates. (default: SHA256WITHRSA)
 -T,--keyStoreType <arg>                   The type of keyStores to generate. (default: jks)
 -t,--token <arg>                          The token to use to prevent MITM (required and must be same as one used by clients)

In my case, I run the TLS toolkit on my node-3, and I run the following command:

./bin/tls-toolkit.sh server -c node-3 -t myTokenToUseToPreventMITM -p 9999

On each of my node, I created a nifi directory in /etc and I ran a command using the toolkit to get my signed certificates generated into my current directory:

.../bin/tls-toolkit.sh client -c node-3 -t myTokenToUseToPreventMITM -p 9999

And I have the following generated files on each of my nodes:

  • config.json
  • keystore.jks
  • nifi-cert.pem
  • truststore.jks

I now configure ZooKeeper as described here. Here is a short list of the tasks (in bold what slightly changed in comparison with my previous post):

  • configure conf/zookeeper.properties to list the ZK nodes
  • configure ZK state ID file
  • set nifi.state.management.embedded.zookeeper.start
  • set nifi.zookeeper.connect.string
  • set nifi.cluster.protocol.is.secure=true
  • set nifi.cluster.is.node=true
  • set nifi.cluster.node.address=node-<1-3>
  • set nifi.cluster.node.protocol.port=9998
  • set nifi.remote.input.host=node-<1-3>
  • set nifi.remote.input.secure=true
  • set nifi.remote.input.socket.port=9997
  • set nifi.web.https.host=node-<1-3>
  • set nifi.web.https.port=8443

(ports are changed to ensure there is no conflict with the TLS toolkit running the CA server)

I now configure the following properties with what has been generated by the toolkit:

nifi.security.keystore=
nifi.security.keystoreType=
nifi.security.keystorePasswd=
nifi.security.keyPasswd=
nifi.security.truststore=
nifi.security.truststoreType=
nifi.security.truststorePasswd=
nifi.security.needClientAuth=

For one of my node, it gives:

nifi.security.keystore=/etc/nifi/keystore.jks
nifi.security.keystoreType=jks
nifi.security.keystorePasswd=4nVzULZ+CBiPCePvFSCw4LDAvzNoumAqu+TcyeDQ1ac
nifi.security.keyPasswd=4nVzULZ+CBiPCePvFSCw4LDAvzNoumAqu+TcyeDQ1ac
nifi.security.truststore=/etc/nifi/truststore.jks
nifi.security.truststoreType=jks
nifi.security.truststorePasswd=0fFJ+pd4qkso0jC0jh7w7tLPPRSINYI6of+KnRBRVSw
nifi.security.needClientAuth=true

Then I generate a certificate for myself as a client to be able to authenticate against NiFi UI:

.../bin/tls-toolkit.sh client -c node-3 -t myTokenToUseToPreventMITM -p 9999 -D "CN=pvillard,OU=NIFI" -T PKCS12

Don’t forget the -T option to get your client certificate in a format that is easy to import in your browser (PKCS12). This command also generates a nifi-cert.pem file that corresponds to the CA certificate, you will need to import it in your browser as well (and you might need to manually update the trust level on this certificate to ensure you have access to the UI).

At this point I’m able to fill the authorizers.xml file. I need to specify myself as initial admin identity (to access the UI with full administration rights), and specify each nodes of my cluster (using the DN provided with the generated certificates). It gives:

<authorizers>
    <authorizer>
        <identifier>file-provider</identifier>
        <class>org.apache.nifi.authorization.FileAuthorizer</class>
        <property name="Authorizations File">./conf/authorizations.xml</property>
        <property name="Users File">./conf/users.xml</property>
        <property name="Initial Admin Identity">CN=pvillard, OU=NIFI</property>
        <property name="Legacy Authorized Users File"></property>

        <property name="Node Identity 1">CN=node-1, OU=NIFI</property>
        <property name="Node Identity 2">CN=node-2, OU=NIFI</property>
        <property name="Node Identity 3">CN=node-3, OU=NIFI</property>
    </authorizer>
</authorizers>

WARNING – Please be careful when updating this file because identities are case-sensitive and blank-sensitive. For example, even though I specified

-D "CN=pvillard,OU=NIFI"

when executing the command to generate the certificates, it introduced a white space after the comma. The correct string to use in the configuration file is given in the output of the TLS toolkit when executing the command.

Once I’ve updated this file on each node, I’m now ready to start each node of the cluster.

./bin/nifi.sh start && tail -f ./logs/nifi-app.log

Once all nodes are correctly started, I am now able to access the NiFi UI using any of the nodes in the cluster:

NiFi UI / 3-nodes secured cluster
NiFi UI / 3-nodes secured cluster

That’s all! Next post will use the current environment as a starting point to demonstrate how to scale up/down a NiFi cluster.