List/Fetch pattern and Remote Process Group in Apache NiFi

I do see a lot of questions about how is working the List[X]/Fetch[X] processors and how to load balance the data over the nodes of a NiFi cluster once the data is already in the cluster. Since the question comes up quite often, let’s discuss the subject and let’s try to understand how things are working here.

I will assume that you are running a NiFi cluster since there is no problem about data balancing with a standalone instance ūüėČ

The first thing to understand is: when running a cluster, one of the node is randomly designated as the “Primary node”. The election takes place when the cluster starts, and there is no way to decide which node will be the primary node. OK… you could force things when your cluster starts but there is no point to do such a thing if you want real high availability. So short line is: all nodes may have to be the Primary node at one point and don’t assume that the Primary node will be a given node in particular.

Second thing to understand is: the flow that you are designing on your canvas is running on all the nodes independently. Each node of the cluster is responsible of its own data and a relationship between two processors does not mean that the data going into this relationship will be balanced over the nodes. Unless you use a Remote Process Group (see below) the data will remain on the same node from the beginning to the end of the flow.

I will use a the following example to illustrate my explanations: I want to get files from a remote SFTP server and put the files into HDFS.

  • First idea (bad idea!) / GetSFTP -> PutHDFS

Screen Shot 2017-02-23 at 11.04.42 AM.png

The first option could be the pattern Get/Put which is perfectly fine with a standalone instance. However this will cause issues if you have a NiFi cluster. Remember? The flow is running on all hosts of your cluster. Problem is that you will have concurrent accesses from your nodes to the same files on the SFTP server… and if the processor is configured to delete the file once retrieved (default behavior) you will have errors showing up. Conclusion: always have in mind that a processor is running on all the nodes and can cause concurrent access errors depending on the remote system.

  • Second idea (not efficient!) / GetSFTP on Primary Node -> PutHDFS

The second option is to configure the GetSFTP processor to only run on the Primary Node (in the Scheduling tab of the processor configuration):

Screen Shot 2017-02-23 at 11.10.12 AM.png

This way, you will solve the concurrent accesses since only one node of your cluster (the Primary node) will run the GetSFTP processor.

Brief aside: remember, the flow is running on all the nodes, however if the processor is configured to run on the primary node only, the processor won’t be scheduled on nodes not being the primary node. That’s all.

With this approach the problem is that it’s not efficient at all. First reason is that you get data from only one node (this does not scale at all), and, in the end, only the primary node of your cluster is actually handling the data. Why? Because, unless you explicitly use a remote process group, the data will remain on the same node from the beginning to the end. In this case, only the primary node will actually get data from SFTP server and push it into HDFS.

  • Recommended pattern : ListSFTP -> RPG / Input Port -> FetchSFTP -> PutHDFS

To solve the issues, the List/Fetch pattern has been developed and widely used for a lot of processors. The idea is the following: the List processor will only list the data to retrieve on the remote system and get the associated metadata (it will not get the data itself). For each listed item, a flow file (with no content) will be generated and attributes will be populated with the metadata. Then the flow file is sent to the Fetch processor to actually retrieved the data from the remote system based on the metadata (it can be the path of the file on the remote system for example). Since each flow file contains the metadata of a specific item on the remote system, you won’t have concurrent accesses even if you have multiple Fetch processors running in parallel.

Obviously the List processor is meant to be run on the Primary node only. Then you have to balance the generated flow files over the nodes so that the Fetch processor on each node is dealing with flow files. For this purpose you have to use a Remote Process Group.

A Remote Process Group is an abstract object used to connect two NiFi setup together (the communication between the two NiFi is what we call Site-to-Site or S2S). It can be a MiNiFi instance to a NiFi cluster, a NiFi cluster to another NiFi cluster, a NiFi standalone to a NiFi cluster, etc. And it can also be used to connect a NiFi cluster to itself! This way the flow files will be balanced over all the nodes of the cluster. Few things to know with a Remote Process Group:

  1. You need to have an input port on the remote instance you are connecting to (in our case, you need a remote input port on your canvas).
  2. The address you give when configuring your remote process group does not matter in terms of high availability: once the connection is established with one of the nodes of the remote instance, the remote process group will be aware of all the nodes of the remote instance and will manage the case where the node specified in the address goes down.
  3. Your instances need to be configured to allow remote access. The required properties are:

# Site to Site properties
nifi.remote.input.host=
nifi.remote.input.secure=
nifi.remote.input.socket.port=
nifi.remote.input.http.enabled=
nifi.remote.input.http.transaction.ttl=

In the case of our SFTP example, it looks like:

Screen Shot 2017-02-23 at 11.47.40 AM.png

Let’s try to understand what is going on from a cluster perspective. Here is what we have in the case of a 3-nodes NiFi cluster with ListSFTP running on the primary node only:

Screen Shot 2017-02-23 at 12.03.22 PM.png

The ListSFTP when scheduled is going to list the three files on my remote SFTP server and will generate one flow file for each remote file. Each flow file won’t have any content but will have attributes with metadata of the remote files. In the case of ListSFTP, I’ll have (check the documentation at the “Write attributes” paragraph):

Name Description
sftp.remote.host The hostname of the SFTP Server
sftp.remote.port The port that was connected to on the SFTP Server
sftp.listing.user The username of the user that performed the SFTP Listing
file.owner The numeric owner id of the source file
file.group The numeric group id of the source file
file.permissions The read/write/execute permissions of the source file
file.lastModifiedTime The timestamp of when the file in the filesystem waslast modified as ‘yyyy-MM-dd’T’HH:mm:ssZ’
filename The name of the file on the SFTP Server
path The fully qualified name of the directory on the SFTP Server from which the file was pulled

The ListSFTP processor will generate 3 flow files and, for now, all flow files are only on the primary node:

Screen Shot 2017-02-23 at 1.59.43 PM.png

Now the Remote Process Group has been configured to connect to the cluster itself, and I set the relationship going from ListSFTP to the Remote Process Group to connect with the input port I created (you may have multiple input ports in the remote system to connect with and you can choose the input port to connect to, that’s up to your needs). When the RPG (Remote Process Group) has the communication enabled, the RPG is aware of the three nodes and will balance the data to each remote node (be aware that there is a lot of parameters for Site-to-Site to improve efficiency). In my case that would give something like:

Screen Shot 2017-02-23 at 1.59.55 PM.png

Note: that would be an ideal case in terms of balancing but, for efficiency purpose, the Site-to-Site mechanism might send batch of flow files to the remote node. In the above example, with only 3 flow files, I would probably not end up with one flow file per node.

Now, since we have everything in the attributes of our flow files, we need to use the Expression Language to set the properties of the FetchSFTP processor to use the attributes of the incoming flow files:

screen-shot-2017-02-23-at-1-55-13-pm

This way, each instance of the FetchSFTP processor will take care of its own file (to actually fetch the content of the remote data) and there won’t be any concurrent access:

Screen Shot 2017-02-23 at 2.00.12 PM.png

All your nodes are retrieving data and you really can scale up your cluster depending on your requirements. Note also that the PutHDFS won’t be an issue neither since each node will write its own file.

As I said previously a lot of processors are embracing this pattern (and this is recommended way to use such processors with a NiFi cluster), and I’d strongly encourage you to do the same when developing your custom processors.

As always questions/comments are welcome.

HAProxy load balancing in front of Apache NiFi

There is a lot of situations where load balancing is necessary even more so when you are in a clustered architecture. In the context of NiFi you may want to consider two situations:

  • You have a NiFi cluster and you don’t want to give users the IP address of your NiFi nodes to access the UI (remember: every node of the cluster can be used for the UI or to make REST API calls) and besides you want to load balance users on every node of the cluster. In addition, in case you scale up/down your cluster, you don’t want to inform all the users of such a change.
  • You have Listen[X] processors (HTTP, TCP, UDP, Syslog, etc) in your workflow that are not running on the primary node only and you want to have all the nodes receiving the data (to increase performances and have full scalability). Again you don’t want to configure/inform the data senders with all the IPs of your NiFi nodes, and in case of a cluster change you don’t want to make some changes on client side.

In such situations you need a load balancer that will stand in front of your NiFi cluster and will provide a VIP (Virtual IP). For this purpose, you have hardware options, software options and also cloud options. Keep in mind that, if you want a production setup, you’ll need to have your load balancer installed in a high availability fashion otherwise it’ll become a single point of failure.

In this article, I’ll use HAProxy which is the most widely used open source software based load balancing solution. And I’ll launch my HAProxy instance in a Docker container (I’ll also use Portainer as a UI helping me to manage my Docker environment).

Some useful links that can help you with the different tools I’m going to use:

I assume I already have my secured NiFi 3-nodes cluster up and running. And also, I’ve already setup my Docker and Portainer environment: I’m ready to pull HAProxy Docker image and to create my container.

Let’s start with my HAProxy instance. First of all, in Portainer, I just need to pull the image I want (in this case I want the latest version and I just need to enter haproxy):

Screen Shot 2017-02-08 at 12.23.09 PM.png

I now have the image available:

screen-shot-2017-02-08-at-12-24-14-pm

Let’s build a container. In order to ease the modification of the configuration file of HAProxy, I’ll define a bind mount to bind the configuration file inside the container to a file I have on my computer. I also need to define all the ports I want to expose in my container. Since my container will run on one of the nodes of my cluster I don’t want to use the same ports (but in practice, your HAProxy would probably be on its own host).

  • Open my container on 9443, and I’ll configure HAProxy to listen on 9443 and to redirect requests made on 9443 to my NiFi nodes on port 8443 (port of my NiFi UI)
  • Open my container on 9999, and I’ll configure HAProxy to listen on 9999 and to redirect requests made on 9999 to my NiFi nodes on port 8888 (port that I’ll use for my ListenHTTP processors)
  • Open my container on 1936 and map on port 1936 inside my container (that’s the port used by the HAProxy management UI)

In Portainer, I add a container and define the following:

Screen Shot 2017-02-08 at 2.42.02 PM.png

When I create my container, it will immediately stop because I didn’t configure my configuration file so far and the container won’t start if the configuration file is invalid or does not exist. You can check the logs of your container in Portainer in case of issue.

In our case we’ll have HTTP access to send data to our ListenHTTP processors and HTTPS to access the UI. In case of HTTPS, I don’t want to have my load balancer taking care of certificates and I just want to have my requests going through (pure TCP load balancing)… this will raise an issue to users accessing the UI because the certificate presented by the NiFi node won’t match the address the client is requesting (load balancer address), this could be solved by adding a SAN (Subject Alternative Name) in the certificate of the nodes but this will be discussed in another article (in the mean time you can have a look at NIFI-3331). In this case (and just because this is a demo!), I’ll accept the SSL exceptions in my browser.

OK so let’s see my HAProxy configuration file:

global

defaults
    log     global
    mode    http
    option  httplog
    option  dontlognull
    timeout connect 5000
    timeout client  50000
    timeout server  50000

frontend nifi-ui
    bind *:9443
    mode tcp
    default_backend nodes-ui

backend nodes-ui
    mode tcp
    balance roundrobin
    stick-table type ip size 200k expire 30m
    stick on src
    option httpchk HEAD / HTTP/1.1\r\nHost:localhost
    server nifi01 node-1:8443 check check-ssl verify none
    server nifi02 node-2:8443 check check-ssl verify none
    server nifi03 node-3:8443 check check-ssl verify none

frontend nifi-listen-http
    bind *:9999
    mode http
    default_backend nodes-http

backend nodes-http
    mode http
    balance roundrobin
    option forwardfor
    http-request set-header X-Forwarded-Port %[dst_port]
    option httpchk HEAD / HTTP/1.1\r\nHost:localhost
    server nifi01 node-1:8888 check
    server nifi02 node-2:8888 check
    server nifi03 node-3:8888 check

listen stats
    bind *:1936
    mode http
    stats enable
    stats uri /
    stats hide-version
    stats auth admin:password

Let’s see the different parts:

  • global – I put nothing in here
  • defaults – Just some timeouts and log configuration, nothing special
  • frontend nifi-ui – here I define a front end called “nifi-ui”. This is where I define on what port HAProxy should listen (in this case on port 9443, for TCP mode) and where I should redirect the requests (in this case to my back end called nodes-ui).
  • backend nodes-ui – here I define my back end where will be redirected my requests received by my front end. I define TCP mode, round robin load balancing, stickiness (to ensure that a connected user, based on its IP, will remain on the same node over multiple requests), and the nodes available. I also define the health check operation performed by HAProxy to confirm if nodes are up or down. In this case it’s a HEAD HTTP request and I don’t check the SSL certificates.
  • frontend nifi-listen-http – here I define the front end that will be used to send data to my ListenHTTP processors on port 9999.
  • backend nodes-ui – here I define the back end with my servers, the HTTP mode, the health check, and also the addition of two HTTP headers (x-forwarded-port and x-forwarded-for) to keep track of IP and port of my client (otherwise I’d only know about the IP and port of my load balancer when receiving data in NiFi).
  • listen stats – here are some parameters about the HAProxy management UI, like the port, the login and password, etc.

I restart my container to take into account the configuration, and I can have a look to the management UI:

screen-shot-2017-02-08-at-4-37-45-pm

All is green meaning that our health checks are OK.

Assuming my load balancer has the following host name “my-nifi-vip”, I can now access the UI through https://my-nifi-vip:9443/nifi :

screen-shot-2017-02-08-at-4-41-53-pm

On my UI, I can configure a ListenHTTP processor as below:

Screen Shot 2017-02-08 at 4.43.14 PM.png

screen-shot-2017-02-08-at-4-45-05-pm

And send data to my cluster using the virtual IP provided by my load balancer:

while true; do curl -X POST http://my-nifi-vip:9999/test; done;

In the HAProxy management UI, I can confirm that my requests are correctly load balanced on all the nodes of my cluster:

screen-shot-2017-02-08-at-5-17-01-pm

It is now really easy to add or remove new nodes in your NiFi cluster without impacting the clients sending data into NiFi since they only need to know about the virtual IP exposed by the load balancer. You just need to update the configuration file when you are adding nodes, and restart your HAProxy container.

Keep in mind that even if the data is correctly load balanced over the nodes, all the requests are going through a single point, the load balancer. Consequently, on a performance standpoint, your load balancer may become a bottleneck in case you need to handle a very large number of connections per second. However load balancers are designed to be as efficient as possible and you should be OK in most cases.

As always questions/comments are welcomed.

Using counters in Apache NiFi

You may not know it but you have the availability to define and play with counters in NiFi. If policies are correctly configured (if your NiFi is secured), you should be able to access the existing counters using the menu:

Screen Shot 2017-02-07 at 5.33.59 PM.png

Counters are just values that you can increase or decrease of a given delta. This is useful if you want to monitor particular values along your workflow. At the moment, unless you use a processor that explicitly uses counters or provides a way to define counters, there is nothing available out of the box.

The best way to define and update counters is to use ExecuteScript processor with the following piece of Groovy code:

def flowFile = session.get()
if(!flowFile) return
session.adjustCounter("my-counter", 1, true)
session.transfer(flowFile, REL_SUCCESS)

With this example, the ExecuteScript processor will just transmit the flow file without any modification to the success relationship but will also increment the counter “my-counter” of 1. If this counter does not exist it will be initialized with the delta value given as argument.

Here is the documentation of this method:

    /**
     * Adjusts counter data for the given counter name and takes care of
     * registering the counter if not already present. The adjustment occurs
     * only if and when the ProcessSession is committed.
     *
     * @param name the name of the counter
     * @param delta the delta by which to modify the counter (+ or -)
     * @param immediate if true, the counter will be updated immediately,
     *            without regard to whether the ProcessSession is commit or rolled back;
     *            otherwise, the counter will be incremented only if and when the
     *            ProcessSession is committed.
     */
    void adjustCounter(String name, long delta, boolean immediate);

Let’s see an example: I want to confirm the correct behavior of the GetHDFS processor when I have multiple instances of this processor looking into the same directory but getting different flow files based on a regular expression.

Here is the first part of my flow:

screen-shot-2017-02-07-at-6-20-52-pm

Basically, I am generating flow files every 1ms with GenerateFlowFile. The generated flow files will be named after the generation date timestamp (without any extension). I am sending the files into HDFS and then I’m using a RouteOnAttribute where I check the filename according to a regular expression to split files according to even and uneven names. This way I can increment counters tracking the number of files I sent to HDFS with even names and with uneven names.

Here is the second part of my flow:

Screen Shot 2017-02-07 at 6.27.28 PM.png

I have two instances of GetHDFS processor configured to look into the same input directory but one with a regular expression to look for files with an even name, and one to look for files with an uneven name, this way there is no concurrent access. Besides, the processor is configured to delete the file on HDFS once the file is retrieved in NiFi. Then I update two different counters to track the number of files with an even name that I retrieved from HDFS, and one for files with an uneven name.

If everything is working correctly, I should be able to let run my workflow a bit, then stop the generation of flow files, wait for all the flow files to be processed and confirm that:

  • even-producer counter is equal to even-consumer counter
  • unenven-producer counter is equal to uneven-consumer counter

Let’s have a look into our counters table:

screen-shot-2017-02-07-at-6-47-02-pm

It looks like we are all good ūüėČ

As a remark, if you have multiple processors updating the same counter, then you will have the global value of the counter but also the value at each processor level. For example, if I have:

screen-shot-2017-02-07-at-6-53-09-pm

With both ExecuteScript incrementing the same “test” counter, then, I’ll have:

screen-shot-2017-02-07-at-6-55-48-pm

Also, as a last remark, you can notice that it’s possible to reset a counter to 0 from the counters table with the button in the last column (assuming you have write access to the counters based on the defined policies). It can be useful when doing some tests.

As always, questions/comments are welcomed!

Apache NiFi 1.1.0 – Secured cluster setup

Apache NiFi 1.1.0 is now out, and I want to discuss a specific subject in a couple of posts: how to scale up and down a NiFi cluster without loosing data? Before going into this subject, I want to setup a 3-nodes secured cluster using the NiFi toolkit. It will be my starting point to scale up my cluster with an additional node, and then scale down my cluster.

There are already great posts describing how to setup a secured cluster using embedded ZK and taking advantage of the NiFi toolkit for the certificates, so I won’t go in too much details. For reference, here are some great articles I recommend:

OK… let’s move on. My initial setup is the following:

  • My OS X laptop
  • 2 CentOS 7 VM

All nodes have required prerequisites (network, java, etc), have the NiFi 1.1.0 binary archive uncompressed available, and have the NiFi toolkit 1.1.0 binary archive uncompressed available.

On my OS X laptop, I will use the NiFi TLS toolkit in server mode so that it acts as a Certificate Authority that can be used by clients to get Certificates.

Here is the description of how to use the TLS toolkit in server mode:

./bin/tls-toolkit.sh server

usage: org.apache.nifi.toolkit.tls.TlsToolkitMain [-a <arg>] [-c <arg>] [--configJsonIn <arg>] [-d <arg>] [-D <arg>] [-f <arg>] [-F] [-g] [-h] [-k <arg>] [-p
       <arg>] [-s <arg>] [-T <arg>] [-t <arg>]

Acts as a Certificate Authority that can be used by clients to get Certificates

 -a,--keyAlgorithm <arg>                   Algorithm to use for generated keys. (default: RSA)
 -c,--certificateAuthorityHostname <arg>   Hostname of NiFi Certificate Authority (default: localhost)
    --configJsonIn <arg>                   The place to read configuration info from (defaults to the value of configJson), implies useConfigJson if set.
                                           (default: configJson value)
 -d,--days <arg>                           Number of days issued certificate should be valid for. (default: 1095)
 -D,--dn <arg>                             The dn to use for the CA certificate (default: CN=YOUR_CA_HOSTNAME,OU=NIFI)
 -f,--configJson <arg>                     The place to write configuration info (default: config.json)
 -F,--useConfigJson                        Flag specifying that all configuration is read from configJson to facilitate automated use (otherwise configJson will
                                           only be written to.
 -g,--differentKeyAndKeystorePasswords     Use different generated password for the key and the keyStore.
 -h,--help                                 Print help and exit.
 -k,--keySize <arg>                        Number of bits for generated keys. (default: 2048)
 -p,--PORT <arg>                           The port for the Certificate Authority to listen on (default: 8443)
 -s,--signingAlgorithm <arg>               Algorithm to use for signing certificates. (default: SHA256WITHRSA)
 -T,--keyStoreType <arg>                   The type of keyStores to generate. (default: jks)
 -t,--token <arg>                          The token to use to prevent MITM (required and must be same as one used by clients)

In my case, I run the TLS toolkit on my node-3, and I run the following command:

./bin/tls-toolkit.sh server -c node-3 -t myTokenToUseToPreventMITM -p 9999

On each of my node, I created a nifi directory in /etc and I ran a command using the toolkit to get my signed certificates generated into my current directory:

.../bin/tls-toolkit.sh client -c node-3 -t myTokenToUseToPreventMITM -p 9999

And I have the following generated files on each of my nodes:

  • config.json
  • keystore.jks
  • nifi-cert.pem
  • truststore.jks

I now configure ZooKeeper as described here. Here is a short list of the tasks (in bold what slightly changed in comparison with my previous post):

  • configure conf/zookeeper.properties to list the ZK nodes
  • configure ZK state ID file
  • set nifi.state.management.embedded.zookeeper.start
  • set nifi.zookeeper.connect.string
  • set nifi.cluster.protocol.is.secure=true
  • set nifi.cluster.is.node=true
  • set nifi.cluster.node.address=node-<1-3>
  • set nifi.cluster.node.protocol.port=9998
  • set nifi.remote.input.host=node-<1-3>
  • set nifi.remote.input.secure=true
  • set nifi.remote.input.socket.port=9997
  • set nifi.web.https.host=node-<1-3>
  • set nifi.web.https.port=8443

(ports are changed to ensure there is no conflict with the TLS toolkit running the CA server)

I now configure the following properties with what has been generated by the toolkit:

nifi.security.keystore=
nifi.security.keystoreType=
nifi.security.keystorePasswd=
nifi.security.keyPasswd=
nifi.security.truststore=
nifi.security.truststoreType=
nifi.security.truststorePasswd=
nifi.security.needClientAuth=

For one of my node, it gives:

nifi.security.keystore=/etc/nifi/keystore.jks
nifi.security.keystoreType=jks
nifi.security.keystorePasswd=4nVzULZ+CBiPCePvFSCw4LDAvzNoumAqu+TcyeDQ1ac
nifi.security.keyPasswd=4nVzULZ+CBiPCePvFSCw4LDAvzNoumAqu+TcyeDQ1ac
nifi.security.truststore=/etc/nifi/truststore.jks
nifi.security.truststoreType=jks
nifi.security.truststorePasswd=0fFJ+pd4qkso0jC0jh7w7tLPPRSINYI6of+KnRBRVSw
nifi.security.needClientAuth=true

Then I generate a certificate for myself as a client to be able to authenticate against NiFi UI:

.../bin/tls-toolkit.sh client -c node-3 -t myTokenToUseToPreventMITM -p 9999 -D "CN=pvillard,OU=NIFI" -T PKCS12

Don’t forget the -T option to get your client certificate in a format that is easy to import in your browser (PKCS12). This command also generates a nifi-cert.pem file that corresponds to the CA certificate, you will need to import it in your browser as well (and you might need to manually update the trust level on this certificate to ensure you have access to the UI).

At this point I’m able to fill the authorizers.xml file. I need to specify myself as initial admin identity (to access the UI with full administration rights), and specify each nodes of my cluster (using the DN provided with the generated certificates). It gives:

<authorizers>
    <authorizer>
        <identifier>file-provider</identifier>
        <class>org.apache.nifi.authorization.FileAuthorizer</class>
        <property name="Authorizations File">./conf/authorizations.xml</property>
        <property name="Users File">./conf/users.xml</property>
        <property name="Initial Admin Identity">CN=pvillard, OU=NIFI</property>
        <property name="Legacy Authorized Users File"></property>

        <property name="Node Identity 1">CN=node-1, OU=NIFI</property>
        <property name="Node Identity 2">CN=node-2, OU=NIFI</property>
        <property name="Node Identity 3">CN=node-3, OU=NIFI</property>
    </authorizer>
</authorizers>

WARNING – Please be careful when updating this file because identities are case-sensitive and blank-sensitive. For example, even though I specified

-D "CN=pvillard,OU=NIFI"

when executing the command to generate the certificates, it introduced a white space after the comma. The correct string to use in the configuration file is given in the output of the TLS toolkit when executing the command.

Once I’ve updated this file on each node, I’m now ready to start each node of the cluster.

./bin/nifi.sh start && tail -f ./logs/nifi-app.log

Once all nodes are correctly started, I am now able to access the NiFi UI using any of the nodes in the cluster:

NiFi UI / 3-nodes secured cluster
NiFi UI / 3-nodes secured cluster

That’s all! Next post will use the current environment as a starting point to demonstrate how to scale up/down a NiFi cluster.

Apache NiFi 1.0.0 – Cluster setup

As you may know a version 1.0.0-BETA of Apache NiFi has been released few days ago. The upcoming 1.0.0 release will be a great moment for the community as it it will mark a lot of work over the last few months with many new features being added.

The objective of the Beta release is to give people a chance to try this new version and to give a feedback before the official major release which will come shortly. If you want to preview this new version with a completely new look, you can download the binaries here, unzip it, and run it (‘./bin/nifi.sh start‘ or ‘./bin/run-nifi.bat‘ for Windows), then you just have to access http://localhost:8080/nifi/.

The objective of this post is to briefly explain how to setup an unsecured NiFi cluster with this new version (a post for setting up a secured cluster will come shortly with explanations on how to use a new tool that will be shipped with NiFi to ease the installation of a secured cluster).

One really important change with this new version is the new paradigm around cluster installation. From the NiFi documentation, we can read:

Starting with the NiFi 1.0 release, NiFi employs a Zero-Master Clustering paradigm. Each of the nodes in a NiFi cluster performs the same tasks on the data but each operates on a different set of data. Apache ZooKeeper elects one of the nodes as the Cluster Coordinator, and failover is handled automatically by ZooKeeper. All cluster nodes report heartbeat and status information to the Cluster Coordinator. The Cluster Coordinator is responsible for disconnecting and connecting nodes. As a DataFlow manager, you can interact with the NiFi cluster through the UI of any node in the cluster. Any change you make is replicated to all nodes in the cluster, allowing for multiple entry points to the cluster.

zero-master-cluster

OK, let’s start with the installation. As you may know it is greatly recommended to use an odd number of ZooKeeper instances with at least 3 nodes (to maintain a majority also called quorum). NiFi comes with an embedded instance of ZooKeeper, but you are free to use an existing cluster of ZooKeeper instances if you want. In this article, we will use the embedded ZooKeeper option.

I will use my computer as the first instance. I also launched two virtual machines (with a minimal Centos 7). All my 3 instances are able to communicate to each other on requested ports. On each machine, I configure my /etc/hosts file with:

192.168.1.17 node-3
192.168.56.101 node-2
192.168.56.102 node-1

I deploy the binaries file on my three instances and unzip it. I now have a NiFi directory on each one of my nodes.

The first thing is to configure the list of the ZK (ZooKeeper) instances in the configuration file ‘./conf/zookeep.properties‘. Since our three NiFi instances will run the embedded ZK instance, I just have to complete the file with the following properties:

server.1=node-1:2888:3888
server.2=node-2:2888:3888
server.3=node-3:2888:3888

Then, everything happens in the ‘./conf/nifi.properties‘. First, I specify that NiFi must run an embedded ZK instance, with the following property:

nifi.state.management.embedded.zookeeper.start=true

I also specify the ZK connect string:

nifi.zookeeper.connect.string=node-1:2181,node-2:2181,node-3:2181

As you can notice, the ./conf/zookeeper.properties file has a property named dataDir. By default, this value is set to ./state/zookeeper. If more than one NiFi node is running an embedded ZK, it is important to tell the server which one it is.

To do that, you need to create a file name myid and placing it in ZK’s data directory. The content of this file should be the index of the server as previously specify by the server. property.

On node-1, I’ll do:

mkdir ./state
mkdir ./state/zookeeper
echo 1 > ./state/zookeeper/myid

The same operation needs to be done on each node (don’t forget to change the ID).

If you don’t do this, you may see the following kind of exceptions in the logs:

Caused by: java.lang.IllegalArgumentException: ./state/zookeeper/myid file is missing

Then we go to clustering properties. For this article, we are setting up an unsecured cluster, so we must keep:

nifi.cluster.protocol.is.secure=false

Then, we have the following properties:

nifi.cluster.is.node=true
nifi.cluster.node.address=node-1
nifi.cluster.node.protocol.port=9999
nifi.cluster.node.protocol.threads=10
nifi.cluster.node.event.history.size=25
nifi.cluster.node.connection.timeout=5 sec
nifi.cluster.node.read.timeout=5 sec
nifi.cluster.firewall.file=

I set the FQDN of the node I am configuring, and I choose the arbitrary 9999 port for the communication with the elected cluster coordinator. I apply the same configuration on my other nodes:

nifi.cluster.is.node=true
nifi.cluster.node.address=node-2
nifi.cluster.node.protocol.port=9999
nifi.cluster.node.protocol.threads=10
nifi.cluster.node.event.history.size=25
nifi.cluster.node.connection.timeout=5 sec
nifi.cluster.node.read.timeout=5 sec
nifi.cluster.firewall.file=

and

nifi.cluster.is.node=true
nifi.cluster.node.address=node-3
nifi.cluster.node.protocol.port=9999
nifi.cluster.node.protocol.threads=10
nifi.cluster.node.event.history.size=25
nifi.cluster.node.connection.timeout=5 sec
nifi.cluster.node.read.timeout=5 sec
nifi.cluster.firewall.file=

We have configured the exchanges between the nodes and the cluster coordinator, now let’s move to the exchanges between the nodes (to balance the data of the flows). We have the following properties:

nifi.remote.input.host=node-1
nifi.remote.input.secure=false
nifi.remote.input.socket.port=9998
nifi.remote.input.http.enabled=true
nifi.remote.input.http.transaction.ttl=30 sec

Again, I set the FQDN of the node I am configuring and I choose the arbitrary 9998 port for the Site-to-Site (S2S) exchanges between the nodes of my cluster. The same applies for all the nodes (just change the host property with the correct FQDN).

It is also important to set the FQDN for the web server property, otherwise we may get strange behaviors with all nodes identified as ‘localhost’ in the UI. Consequently, for each node, set the following property with the correct FQDN:

nifi.web.http.host=node-1

And that’s all! Easy, isn’t it?

OK, let’s start our nodes and let’s tail the logs to see what’s going on there!

./bin/nifi.sh start && tail -f ./logs/nifi-app.log

If you look at the logs, you should see that one of the node gets elected as the cluster coordinator and then you should see heartbeats created by the three nodes and sent to the cluster coordinator every 5 seconds.

You can connect to the UI using the node you want (you can have multiple users connected to different nodes, modifications will be applied on each node). Let’s go to:

http://node-2:8080/nifi

Here is what it looks like:

Screen Shot 2016-08-13 at 7.33.08 PM

As you can see in the top-left corner, there are 3 nodes in our cluster. Besides, if we go in the menu (button in the top-right corner) and select the cluster page, we have details on our three nodes:

Screen Shot 2016-08-13 at 7.35.28 PM

We see that my node-2 has been elected as cluster coordinator, and that my node-3 is my primary node. This distinction is important because some processors must run on a unique node (for data consistency) and in this case we will want it to run “On primary node” (example below).

We can display details on a specific node (“information” icon on the left):

Screen Shot 2016-08-13 at 7.35.48 PM

OK, let’s add a processor like GetTwitter. Since the flow will run on all nodes (with balanced data between the nodes), this processor must run on a unique processor if we don’t want to duplicate data. Then, in the scheduling strategy, we will choose the strategy “On primary node”. This way, we don’t duplicate data, and if the primary node changes (because my node dies or gets disconnected), we won’t loose data, the workflow will still be executed.

Screen Shot 2016-08-13 at 7.45.19 PM

Then I can connect my processor to a PutFile processor to save the tweets in JSON by setting a local directory (/tmp/twitter):

Screen Shot 2016-08-13 at 7.52.25 PM

If I run this flow, all my JSON tweets will be stored on the primary node, the data won’t be balanced. To balance the data, I need to use a RPG (Remote Process Group), the RPG will connect to the coordinator to evaluate the load of each node and balance the data over the nodes. It gives us the following flow:

Screen Shot 2016-08-13 at 8.00.26 PM

I have added an input port called “RPG”, then I have added a Remote Process Group that I connected to ” http://node-2:8080/nifi ” and I enabled transmission so that the Remote Process Group was aware of the existing input ports on my cluster. Then in the Remote Process Group configuration, I enabled the RPG input port. I then connected my GetTwitter to the Remote Process Group and selected the RPG input port. Finally, I connected my RPG input port to my PutFile processor.

When running the flow, I now have balanced data all over my nodes (I can check in the local directory ‘/tmp/twitter‘ on each node).

That’s all for this post. I hope you enjoyed it and that it will be helpful for you if setting up a NiFi cluster. All comments/remarks are very welcomed and I kindly encourage you to download Apache NiFi, to try it and to give a feedback to the community if you have any.

Apache NiFi – MiNiFi is (almost) out!

This is quite a busy period for Apache NiFi community: Apache NiFi 0.7.0 is about to be released (RC2 is coming), Apache NiFi 1.0.0 will probably be ready in the next few weeks (stay tuned) and… Apache MiNiFi 0.0.1 will officially be released next week (RC vote in progress and so far so good)! This a great step for the community and I wanted to write a quick article about this new amazing tool!

Here is my step-by-step article to get hands on MiNiFi and play with it. But first of all, if you want to know more about this great subproject, have a look here: https://nifi.apache.org/minifi/index.html

  • Build MiNiFi from sources

I downloaded the sources from this link: https://dist.apache.org/repos/dist/dev/nifi/minifi-0.0.1/minifi-0.0.1-source-release.zip

And built the sources using maven: mvn clean install.

There are two convenience binaries generated as part of this process.  The
MiNiFi assembly and a MiNiFi Toolkit assembly.

  • Convert and validate MiNiFi templates

The toolkit can be used to convert a NiFi template (XML) into a MiNiFi template (YAML) or to validate a MiNiFi template.

For this process, I created a very simple template using  NiFi. This template is made of a GenerateFlowFile processor to generate flow files with no content every 5 seconds, and an AttributeToJson processor to extract attributes of the flow file and generate a JSON in the flow file content. Then it is connected to a Remote Process Group pointing to a local running NiFi instance configured to allow Site-to-Site communication.

minifi-flow

I then saved this template as a XML file (NiFi template).

If I look at the toolkit usage (in a Windows environment):

minifi-toolkit-0.0.1\bin> .\config.bat
Usage:

java org.apache.nifi.minifi.toolkit.configuration.ConfigMain <command> options

Valid commands include:
transform: Transform template xml into MiNiFi config YAML
validate: Validate config YAML

I now use the toolkit to convert my template:

.\config.bat transform MiNiFi-test.xml MiNiFi-test.yml

And I can also validate the generated configuration file:

.\config.bat validate MiNiFi-test.yml

Note: here is the working configuration I then used in MiNiFi.

  • Run MiNiFi

I now take the generated configuration and use it to replace the default one:

minifi-0.0.1\conf\config.yml

And I now start MiNiFi:

minifi-0.0.1\bin\run-minifi.bat

By looking at the logs (minifi-0.0.1\logs\minifi-app.log), we can see all the processors that are shipped with MiNiFi. Here is a list of the currently standard processors included with MiNiFi:

    org.apache.nifi.processors.standard.PostHTTP || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.RouteOnContent || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.FetchFile || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.EvaluateXPath || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.SplitContent || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.ListSFTP || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.ReplaceText || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.MergeContent || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.ConvertCharacterSet || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.PutDistributedMapCache || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.HandleHttpRequest || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.PutSyslog || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.CompressContent || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.ParseSyslog || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.GetFile || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.RouteOnAttribute || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.ModifyBytes || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.ControlRate || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.HashAttribute || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.Base64EncodeContent || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.TailFile || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.GetHTTP || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.HashContent || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.EvaluateXQuery || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.IdentifyMimeType || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.GetJMSQueue || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.ListenTCP || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.GetFTP || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.FetchDistributedMapCache || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.PutJMS || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.SplitXml || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.EvaluateRegularExpression || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.ListenSyslog || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.ScanContent || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.ConvertJSONToSQL || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.EncryptContent || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.FetchSFTP || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.attributes.UpdateAttribute || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-update-attribute-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.GetJMSTopic || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.ReplaceTextWithMapping || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.SplitJson || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.ListFile || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.TransformXml || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.EvaluateJsonPath || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.ExecuteProcess || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.MonitorActivity || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.ValidateXml || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.ExecuteSQL || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.SegmentContent || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.PutSFTP || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.ExecuteStreamCommand || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.LogAttribute || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.DistributeLoad || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.GenerateFlowFile || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.ListenHTTP || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.ListenUDP || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.PutSQL || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.PutFile || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.PutFTP || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.RouteText || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.ListenRELP || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.InvokeHTTP || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.ExtractText || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.UnpackContent || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.AttributesToJSON || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.PutEmail || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.DetectDuplicate || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.ScanAttribute || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.SplitText || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.GetSFTP || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.HandleHttpResponse || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.DuplicateFlowFile || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.QueryDatabaseTable || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]

As you can see, we can, with a default installation, do a lot of amazing things!!!

Once MiNiFi is started, the configured flow will be automatically started, and I am now able to see in my running NiFi instance all the files I received from the MiNiFi instance through Site-to-Site communication into the input port I configured:

nifi-with-minifi

For the purpose of the demonstration, the above screenshot shows all the processors involved in this demonstration:

  • I used the red-circled elements to generate a NiFi template, convert it into a MiNiFi template and get it running into a MiNiFi 0.0.1 instance.
  • The green-circled part is the only part really running on my NiFi instance and it is receiving all the flow files generated by the MiNiFi instance.

That’s all for this quick demonstration of this new tool. Hope you enjoyed it and that you will give it a try!

  • Conclusion

MiNiFi is a very lightweight tool (40Mo !) that can be easily deployed on a lot of remote machines/servers as a single running instance to collect and quickly process data or to be a remote relay to communicate with NiFi. Perspectives of the role of MiNiFi should be from the perspective of the agent acting immediately at, or directly adjacent to, source sensors, systems, or servers.

It is really easy to use and deploy and will, hopefully, be widely used in the Internet of Things! Besides, it comes with the top level security features coming with NiFi! A must!

US presidential election via Twitter using Apache NiFi, Spark, Hive and Zeppelin

This article describes a frequency and sentiment analysis based on real-time tweets streams in relation to the four main candidates in the US Presidential Election.

The main objective was to deploy and to test the available connector between Apache NiFi and Apache Spark, so I decided to implement the following use case:

At the end, I get real time analytics such as:

  • frequency of tweets along the time per candidate
  • percentage of negative, positive and neutral tweets per candidate
  • opinion trends along the time for each candidate

The article is available on Hortonworks Community Connection website. And as always, please feel free to comment and/or ask questions.