Apache NiFi 1.0.0 – Cluster setup

As you may know a version 1.0.0-BETA of Apache NiFi has been released few days ago. The upcoming 1.0.0 release will be a great moment for the community as it it will mark a lot of work over the last few months with many new features being added.

The objective of the Beta release is to give people a chance to try this new version and to give a feedback before the official major release which will come shortly. If you want to preview this new version with a completely new look, you can download the binaries here, unzip it, and run it (‘./bin/nifi.sh start‘ or ‘./bin/run-nifi.bat‘ for Windows), then you just have to access http://localhost:8080/nifi/.

The objective of this post is to briefly explain how to setup an unsecured NiFi cluster with this new version (a post for setting up a secured cluster will come shortly with explanations on how to use a new tool that will be shipped with NiFi to ease the installation of a secured cluster).

One really important change with this new version is the new paradigm around cluster installation. From the NiFi documentation, we can read:

Starting with the NiFi 1.0 release, NiFi employs a Zero-Master Clustering paradigm. Each of the nodes in a NiFi cluster performs the same tasks on the data but each operates on a different set of data. Apache ZooKeeper elects one of the nodes as the Cluster Coordinator, and failover is handled automatically by ZooKeeper. All cluster nodes report heartbeat and status information to the Cluster Coordinator. The Cluster Coordinator is responsible for disconnecting and connecting nodes. As a DataFlow manager, you can interact with the NiFi cluster through the UI of any node in the cluster. Any change you make is replicated to all nodes in the cluster, allowing for multiple entry points to the cluster.

zero-master-cluster

OK, let’s start with the installation. As you may know it is greatly recommended to use an odd number of ZooKeeper instances with at least 3 nodes (to maintain a majority also called quorum). NiFi comes with an embedded instance of ZooKeeper, but you are free to use an existing cluster of ZooKeeper instances if you want. In this article, we will use the embedded ZooKeeper option.

I will use my computer as the first instance. I also launched two virtual machines (with a minimal Centos 7). All my 3 instances are able to communicate to each other on requested ports. On each machine, I configure my /etc/hosts file with:

192.168.1.17 node-3
192.168.56.101 node-2
192.168.56.102 node-1

I deploy the binaries file on my three instances and unzip it. I now have a NiFi directory on each one of my nodes.

The first thing is to configure the list of the ZK (ZooKeeper) instances in the configuration file ‘./conf/zookeeper.properties‘. Since our three NiFi instances will run the embedded ZK instance, I just have to complete the file with the following properties:

server.1=node-1:2888:3888
server.2=node-2:2888:3888
server.3=node-3:2888:3888

Then, everything happens in the ‘./conf/nifi.properties‘. First, I specify that NiFi must run an embedded ZK instance, with the following property:

nifi.state.management.embedded.zookeeper.start=true

I also specify the ZK connect string:

nifi.zookeeper.connect.string=node-1:2181,node-2:2181,node-3:2181

As you can notice, the ./conf/zookeeper.properties file has a property named dataDir. By default, this value is set to ./state/zookeeper. If more than one NiFi node is running an embedded ZK, it is important to tell the server which one it is.

To do that, you need to create a file name myid and placing it in ZK’s data directory. The content of this file should be the index of the server as previously specify by the server. property.

On node-1, I’ll do:

mkdir ./state
mkdir ./state/zookeeper
echo 1 > ./state/zookeeper/myid

The same operation needs to be done on each node (don’t forget to change the ID).

If you don’t do this, you may see the following kind of exceptions in the logs:

Caused by: java.lang.IllegalArgumentException: ./state/zookeeper/myid file is missing

Then we go to clustering properties. For this article, we are setting up an unsecured cluster, so we must keep:

nifi.cluster.protocol.is.secure=false

Then, we have the following properties:

nifi.cluster.is.node=true
nifi.cluster.node.address=node-1
nifi.cluster.node.protocol.port=9999
nifi.cluster.node.protocol.threads=10
nifi.cluster.node.event.history.size=25
nifi.cluster.node.connection.timeout=5 sec
nifi.cluster.node.read.timeout=5 sec
nifi.cluster.firewall.file=

I set the FQDN of the node I am configuring, and I choose the arbitrary 9999 port for the communication with the elected cluster coordinator. I apply the same configuration on my other nodes:

nifi.cluster.is.node=true
nifi.cluster.node.address=node-2
nifi.cluster.node.protocol.port=9999
nifi.cluster.node.protocol.threads=10
nifi.cluster.node.event.history.size=25
nifi.cluster.node.connection.timeout=5 sec
nifi.cluster.node.read.timeout=5 sec
nifi.cluster.firewall.file=

and

nifi.cluster.is.node=true
nifi.cluster.node.address=node-3
nifi.cluster.node.protocol.port=9999
nifi.cluster.node.protocol.threads=10
nifi.cluster.node.event.history.size=25
nifi.cluster.node.connection.timeout=5 sec
nifi.cluster.node.read.timeout=5 sec
nifi.cluster.firewall.file=

We have configured the exchanges between the nodes and the cluster coordinator, now let’s move to the exchanges between the nodes (to balance the data of the flows). We have the following properties:

nifi.remote.input.host=node-1
nifi.remote.input.secure=false
nifi.remote.input.socket.port=9998
nifi.remote.input.http.enabled=true
nifi.remote.input.http.transaction.ttl=30 sec

Again, I set the FQDN of the node I am configuring and I choose the arbitrary 9998 port for the Site-to-Site (S2S) exchanges between the nodes of my cluster. The same applies for all the nodes (just change the host property with the correct FQDN).

It is also important to set the FQDN for the web server property, otherwise we may get strange behaviors with all nodes identified as ‘localhost’ in the UI. Consequently, for each node, set the following property with the correct FQDN:

nifi.web.http.host=node-1

And that’s all! Easy, isn’t it?

OK, let’s start our nodes and let’s tail the logs to see what’s going on there!

./bin/nifi.sh start && tail -f ./logs/nifi-app.log

If you look at the logs, you should see that one of the node gets elected as the cluster coordinator and then you should see heartbeats created by the three nodes and sent to the cluster coordinator every 5 seconds.

You can connect to the UI using the node you want (you can have multiple users connected to different nodes, modifications will be applied on each node). Let’s go to:

http://node-2:8080/nifi

Here is what it looks like:

Screen Shot 2016-08-13 at 7.33.08 PM

As you can see in the top-left corner, there are 3 nodes in our cluster. Besides, if we go in the menu (button in the top-right corner) and select the cluster page, we have details on our three nodes:

Screen Shot 2016-08-13 at 7.35.28 PM

We see that my node-2 has been elected as cluster coordinator, and that my node-3 is my primary node. This distinction is important because some processors must run on a unique node (for data consistency) and in this case we will want it to run “On primary node” (example below).

We can display details on a specific node (“information” icon on the left):

Screen Shot 2016-08-13 at 7.35.48 PM

OK, let’s add a processor like GetTwitter. Since the flow will run on all nodes (with balanced data between the nodes), this processor must run on a unique processor if we don’t want to duplicate data. Then, in the scheduling strategy, we will choose the strategy “On primary node”. This way, we don’t duplicate data, and if the primary node changes (because my node dies or gets disconnected), we won’t loose data, the workflow will still be executed.

Screen Shot 2016-08-13 at 7.45.19 PM

Then I can connect my processor to a PutFile processor to save the tweets in JSON by setting a local directory (/tmp/twitter):

Screen Shot 2016-08-13 at 7.52.25 PM

If I run this flow, all my JSON tweets will be stored on the primary node, the data won’t be balanced. To balance the data, I need to use a RPG (Remote Process Group), the RPG will connect to the coordinator to evaluate the load of each node and balance the data over the nodes. It gives us the following flow:

Screen Shot 2016-08-13 at 8.00.26 PM

I have added an input port called “RPG”, then I have added a Remote Process Group that I connected to ” http://node-2:8080/nifi ” and I enabled transmission so that the Remote Process Group was aware of the existing input ports on my cluster. Then in the Remote Process Group configuration, I enabled the RPG input port. I then connected my GetTwitter to the Remote Process Group and selected the RPG input port. Finally, I connected my RPG input port to my PutFile processor.

When running the flow, I now have balanced data all over my nodes (I can check in the local directory ‘/tmp/twitter‘ on each node).

That’s all for this post. I hope you enjoyed it and that it will be helpful for you if setting up a NiFi cluster. All comments/remarks are very welcomed and I kindly encourage you to download Apache NiFi, to try it and to give a feedback to the community if you have any.

Apache NiFi – MiNiFi is (almost) out!

This is quite a busy period for Apache NiFi community: Apache NiFi 0.7.0 is about to be released (RC2 is coming), Apache NiFi 1.0.0 will probably be ready in the next few weeks (stay tuned) and… Apache MiNiFi 0.0.1 will officially be released next week (RC vote in progress and so far so good)! This a great step for the community and I wanted to write a quick article about this new amazing tool!

Here is my step-by-step article to get hands on MiNiFi and play with it. But first of all, if you want to know more about this great subproject, have a look here: https://nifi.apache.org/minifi/index.html

  • Build MiNiFi from sources

I downloaded the sources from this link: https://dist.apache.org/repos/dist/dev/nifi/minifi-0.0.1/minifi-0.0.1-source-release.zip

And built the sources using maven: mvn clean install.

There are two convenience binaries generated as part of this process.  The
MiNiFi assembly and a MiNiFi Toolkit assembly.

  • Convert and validate MiNiFi templates

The toolkit can be used to convert a NiFi template (XML) into a MiNiFi template (YAML) or to validate a MiNiFi template.

For this process, I created a very simple template using  NiFi. This template is made of a GenerateFlowFile processor to generate flow files with no content every 5 seconds, and an AttributeToJson processor to extract attributes of the flow file and generate a JSON in the flow file content. Then it is connected to a Remote Process Group pointing to a local running NiFi instance configured to allow Site-to-Site communication.

minifi-flow

I then saved this template as a XML file (NiFi template).

If I look at the toolkit usage (in a Windows environment):

minifi-toolkit-0.0.1\bin> .\config.bat
Usage:

java org.apache.nifi.minifi.toolkit.configuration.ConfigMain <command> options

Valid commands include:
transform: Transform template xml into MiNiFi config YAML
validate: Validate config YAML

I now use the toolkit to convert my template:

.\config.bat transform MiNiFi-test.xml MiNiFi-test.yml

And I can also validate the generated configuration file:

.\config.bat validate MiNiFi-test.yml

Note: here is the working configuration I then used in MiNiFi.

  • Run MiNiFi

I now take the generated configuration and use it to replace the default one:

minifi-0.0.1\conf\config.yml

And I now start MiNiFi:

minifi-0.0.1\bin\run-minifi.bat

By looking at the logs (minifi-0.0.1\logs\minifi-app.log), we can see all the processors that are shipped with MiNiFi. Here is a list of the currently standard processors included with MiNiFi:

    org.apache.nifi.processors.standard.PostHTTP || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.RouteOnContent || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.FetchFile || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.EvaluateXPath || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.SplitContent || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.ListSFTP || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.ReplaceText || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.MergeContent || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.ConvertCharacterSet || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.PutDistributedMapCache || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.HandleHttpRequest || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.PutSyslog || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.CompressContent || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.ParseSyslog || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.GetFile || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.RouteOnAttribute || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.ModifyBytes || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.ControlRate || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.HashAttribute || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.Base64EncodeContent || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.TailFile || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.GetHTTP || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.HashContent || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.EvaluateXQuery || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.IdentifyMimeType || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.GetJMSQueue || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.ListenTCP || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.GetFTP || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.FetchDistributedMapCache || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.PutJMS || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.SplitXml || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.EvaluateRegularExpression || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.ListenSyslog || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.ScanContent || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.ConvertJSONToSQL || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.EncryptContent || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.FetchSFTP || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.attributes.UpdateAttribute || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-update-attribute-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.GetJMSTopic || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.ReplaceTextWithMapping || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.SplitJson || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.ListFile || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.TransformXml || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.EvaluateJsonPath || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.ExecuteProcess || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.MonitorActivity || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.ValidateXml || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.ExecuteSQL || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.SegmentContent || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.PutSFTP || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.ExecuteStreamCommand || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.LogAttribute || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.DistributeLoad || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.GenerateFlowFile || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.ListenHTTP || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.ListenUDP || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.PutSQL || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.PutFile || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.PutFTP || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.RouteText || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.ListenRELP || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.InvokeHTTP || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.ExtractText || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.UnpackContent || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.AttributesToJSON || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.PutEmail || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.DetectDuplicate || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.ScanAttribute || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.SplitText || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.GetSFTP || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.HandleHttpResponse || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.DuplicateFlowFile || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.QueryDatabaseTable || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]

As you can see, we can, with a default installation, do a lot of amazing things!!!

Once MiNiFi is started, the configured flow will be automatically started, and I am now able to see in my running NiFi instance all the files I received from the MiNiFi instance through Site-to-Site communication into the input port I configured:

nifi-with-minifi

For the purpose of the demonstration, the above screenshot shows all the processors involved in this demonstration:

  • I used the red-circled elements to generate a NiFi template, convert it into a MiNiFi template and get it running into a MiNiFi 0.0.1 instance.
  • The green-circled part is the only part really running on my NiFi instance and it is receiving all the flow files generated by the MiNiFi instance.

That’s all for this quick demonstration of this new tool. Hope you enjoyed it and that you will give it a try!

  • Conclusion

MiNiFi is a very lightweight tool (40Mo !) that can be easily deployed on a lot of remote machines/servers as a single running instance to collect and quickly process data or to be a remote relay to communicate with NiFi. Perspectives of the role of MiNiFi should be from the perspective of the agent acting immediately at, or directly adjacent to, source sensors, systems, or servers.

It is really easy to use and deploy and will, hopefully, be widely used in the Internet of Things! Besides, it comes with the top level security features coming with NiFi! A must!

US presidential election via Twitter using Apache NiFi, Spark, Hive and Zeppelin

This article describes a frequency and sentiment analysis based on real-time tweets streams in relation to the four main candidates in the US Presidential Election.

The main objective was to deploy and to test the available connector between Apache NiFi and Apache Spark, so I decided to implement the following use case:

At the end, I get real time analytics such as:

  • frequency of tweets along the time per candidate
  • percentage of negative, positive and neutral tweets per candidate
  • opinion trends along the time for each candidate

The article is available on Hortonworks Community Connection website. And as always, please feel free to comment and/or ask questions.

OAuth 1.0A with Apache NiFi (Twitter API example)

A lot of API are using OAuth  protocol to authorize the received requests and to check if everything is OK regarding the identity of the request sender.

OAuth is an open standard for authorization, commonly used as a way for Internet users to log into third party websites using their Microsoft, Google, Facebook, Twitter, One Network etc. accounts without exposing their password. Generally, OAuth provides to clients a “secure delegated access” to server resources on behalf of a resource owner. It specifies a process for resource owners to authorize third-party access to their server resources without sharing their credentials. Designed specifically to work with Hypertext Transfer Protocol (HTTP), OAuth essentially allows access tokens to be issued to third-party clients by an authorization server, with the approval of the resource owner. The third party then uses the access token to access the protected resources hosted by the resource server.

As a remark, there are two versions of the protocol currently used out there: 1.0A and 2.0. As far as I know, 1.0A is the most commonly used. I already faced the need to use OAuth 1.0A protocol with the Flickr API but, back then, I found a way to get my data differently.

Recently, a question was asked on the Hortonworks Community Connection regarding the use of Apache NiFi to get data from Twitter API using OAuth 1.0A protocol. So this time, I decided to have a look on this and to get the job done.

This post presents the flow I used to perform a request against Twitter API using OAuth protocol. It gives me the opportunity to use for the first time the ExecuteScript processor which allows user to execute custom scripts on the fly inside NiFi (you will find a lot of examples on this great site).

Note 1: this was the first time I used Groovy language, be nice with me!

Note 2: I didn’t test the flow on a lot of methods. Some modifications may be necessary for some cases.

OK. The objective was to request the “users/lookup” method of the Twitter API. You can read the documentation here.

I want to perform a HTTP GET on:

https://api.twitter.com/1.1/users/lookup.json?screen_name=twitterapi,twitter

So far it seems really easy to do with a simple InvokeHTTP processor. The thing is you need to identify yourself when sending the request. Here comes the OAuth protocol. The official specification for 1.0A can be read here. But in the case of the Twitter API, you have a nice documentation here.

Besides on the documentation of each method, you have an OAuth Signature Generator that can be accessed (if you have defined a Twitter App). The generator is here. It lets you play around and gives great insights on each request to debug your own implementation of OAuth protocol.

The global idea is: you have a lot of input parameters and you must follow the specifications to construct a string based on the parameters. This string will be the value associated to “Authorization” key in HTTP header properties.

Here is the list of the needed parameters. First the parameters directly linked to your request:

Then the global parameters related to OAuth:

  • Consumer key (private information of your app provided by Twitter)
  • Consumer secret (private information of your app provided by Twitter)
  • Nonce (random string, uniquely generated for each request)
  • Signature method (with Twitter it is HMAC-SHA1)
  • Timestamp (in seconds)
  • Token (private information of your app provided by Twitter)
  • Token secret (private information of your app provided by Twitter)
  • Version (in this case 1.0)

The first step is to construct the “signature base string“. For that you first need to create the “parameter string“. All is very well explained here. Once you have the signature base string, you can encode it using HMAC-SHA1 and you easily get the header property to set in your HTTP request:

Authorization: OAuth oauth_consumer_key="*******", oauth_nonce="a9ab2392e5158a4c4e029c7829164571", oauth_signature="4s4Hi5hQ%2FoLKprW7qsRlImds3Og%3D", oauth_signature_method="HMAC-SHA1", oauth_timestamp="1460453975", oauth_token="*******", oauth_version="1.0"

Let’s get into the details using Apache NiFi. Here is the flow I created:

oauthFlow

I use a GenerateFlowFile to generate an empty Flow File (FF) in order to execute my flow. Then I use an UpdateAttribute processor to add attributes to my FF. In this case, I only add the parameters related to the specific request I want to execute:

globalParam

Then I send my FF into a process group that will compute the header property to set (I will come back to this part later). Then I perform my request using the InvokeHTTP processor:

invokeHTTP

I set the method to GET, the URL to my corresponding FF attribute, the content type to text/plain and I add a custom property named Authorization with the FF attribute I created in my process group (see below). This custom property will be added as a HTTP header in the request. At the end, I use a PutFile processor to write the result of my request in a local directory.

Let’s go to the interesting part of our flow where all the magic is: the process group I named OAuth 1.0A. Here it is:

processGroup

I just use two processors. The first one is an UpdateAttribute to add all the parameters I need as attributes of my FF. the second one is an ExecuteScript processor where I put my groovy code to compute the header property.

First… the parameters:

oauthParameters

Note: I use Expression Language provided by NiFi for some attributes.

  • arguments is used to extract the argument part in my target URL. In this example: screen_name=twitterapi,twitter
  • base_url is the URL I request without any argument. In this example: https://api.twitter.com/1.1/users/lookup.json
  • For the nonce parameter I use the “UUID” method of the expression language which generated a random string and I just take to replace the ‘-‘ characters to only keep an alphanumeric string.
  • For the timestamp, I use the “now” method of the expression language and I use substring to only keep seconds.

Let’s move to the ExecuteScript part. I set the script engine to Groovy and I put my script code in the “script body” property. The full code is available at the end of the post. Let’s go through it piece by piece.

First thing, I want to trigger my code only when a FF is available:

def flowFile = session.get()
if (!flowFile) return

Then I define a method I will use for the HMAC-SHA1 encoding:

def static hmac(String data, String key) throws java.security.SignatureException
{
    String result
    try {
        // get an hmac_sha1 key from the raw key bytes
        SecretKeySpec signingKey = new SecretKeySpec(key.getBytes(), "HmacSHA1");
        // get an hmac_sha1 Mac instance and initialize with the signing key
        Mac mac = Mac.getInstance("HmacSHA1");
        mac.init(signingKey);
        // compute the hmac on input data bytes
        byte[] rawHmac = mac.doFinal(data.getBytes());
        result= rawHmac.encodeBase64()
    } catch (Exception e) {
        throw new SignatureException("Failed to generate HMAC : " + e.getMessage());
    }
    return result
}

For this part, I will need to add some imports at the beginning of my script body:

import java.security.SignatureException
import javax.crypto.Mac
import javax.crypto.spec.SecretKeySpec

Then I retrieve all the attributes of my FF and I extract some attributes I don’t need to construct my parameter string:

def attributes = flowFile.getAttributes()
// retrieve arguments of the target and split arguments
def arguments = attributes.arguments.tokenize('&')
def method = attributes.method
def base_url = attributes.base_url
def consumerSecret = attributes.oauth_consumer_secret
def tokenSecret = attributes.oauth_token_secret

Then I create a TreeMap in which I add all the parameters I need to construct my parameter string. A TreeMap ensures me that it is sorted on keys in alphabetical order.

TreeMap map = [:]

for (String item : arguments) {
        def (key, value) = item.tokenize('=')
        map.put(key, value)
}

map.put("oauth_consumer_key", attributes.oauth_consumer_key)
map.put("oauth_nonce", attributes.oauth_nonce)
map.put("oauth_signature_method", attributes.oauth_signature_method)
map.put("oauth_timestamp", attributes.oauth_timestamp)
map.put("oauth_token", attributes.oauth_token)
map.put("oauth_version", attributes.oauth_version)

Then I add a method to the String class to allow percent encoding on String objects:

String.metaClass.encode = {
    java.net.URLEncoder.encode(delegate, "UTF-8").replace("+", "%20").replace("*", "%2A").replace("%7E", "~");
}

I am now able to construct the parameter string:

String parameterString = ""

map.each { key, value ->
    parameterString += key.encode()
    parameterString += '='
    parameterString += value.encode()
    parameterString += '&'
}

parameterString = parameterString.substring(0, parameterString.length()-1);

Update: the code above can be simplified as below (see Andy’s comment)

String parameterString = map.collect { String key, String value ->
    "${key.encode()}=${value.encode()}"
}.join("&")

It is now possible to get the signature:

String signatureBaseString = ""
signatureBaseString += method.toUpperCase()
signatureBaseString += '&'
signatureBaseString += base_url.encode()
signatureBaseString += '&'
signatureBaseString += parameterString.encode()

String signingKey = consumerSecret.encode() + '&' + tokenSecret.encode()
String oauthSignature = hmac(signatureBaseString, signingKey)

I may add this information as a new attribute of my FF:

flowFile = session.putAttribute(flowFile, 'oauth_signature', oauthSignature)

Then I can construct the header property value to associate to Authorization key:

String oauth = 'OAuth '
oauth += 'oauth_consumer_key="'
oauth += attributes.oauth_consumer_key.encode()
oauth += '", '
oauth += 'oauth_nonce="'
oauth += attributes.oauth_nonce.encode()
oauth += '", '
oauth += 'oauth_signature="'
oauth += oauthSignature.encode()
oauth += '", '
oauth += 'oauth_signature_method="'
oauth += attributes.oauth_signature_method.encode()
oauth += '", '
oauth += 'oauth_timestamp="'
oauth += attributes.oauth_timestamp.encode()
oauth += '", '
oauth += 'oauth_token="'
oauth += attributes.oauth_token.encode()
oauth += '", '
oauth += 'oauth_version="'
oauth += attributes.oauth_version.encode()
oauth += '"'

I add this information as an attribute (that will be used in the InvokeHTTP processor as we saw before) and I forward my FF to the success relationship:

flowFile = session.putAttribute(flowFile, 'oauth_header', oauth)
session.transfer(flowFile, REL_SUCCESS)

That’s it: I have an operational flow implementing OAuth 1.0A protocol to request against the Twitter API.

The full code is available here as a gist.
The NiFi template is available here.

As always, feel free to ask questions and comment this post!

 

URL shortener service with Apache NiFi

This blog will demonstrate a new use case using Apache NiFi: implement a URL shortener service. Let’s be clear right now, I don’t think Apache NiFi is the best option to propose such a service (this is not the idea behind this Apache project) but I believe this is an opportunity to play around with some processors/functionalities I never discussed so far.

Why this idea? Some months ago, for a job interview, I have been asked to develop a URL shortener service in Go using Docker and Redis. This is available on Github here. Since this is something we can do with Apache NiFi, it is interesting to see how this can be achieved.

Before talking about Apache NiFi (if you don’t know about this project, have a look on my previous posts, this is a great tool!), let’s discuss what we want to achieve…

  • URL shortener service requirements

I want to expose a web service that gives me the opportunity to shorten long URLs in order to be able to share/remember it easily. I also want to store statistics about the number of times my shortened URL has been used. And, in our use case, I want my shortened URL to be valid at least 24h.

  • Example

I want to shorten this URL:

https://pierrevillard.com/2016/04/04/analyze-flickr-account-using-apache/

Let’s say my service is running on my local computer, I am going to access it with my browser:

localhost/shortlink?url=https://pierrevillard.com/2016/04/04/analyze-flickr-account-using-apache/

I will get a web page displaying JSON data:

{"key":"6974","url":"https://pierrevillard.com/2016/04/04/analyze-flickr-account-using-apache/","date":1460238097015,"count":0}

From now on, I will be able to access my URL by using:

localhost/6974

And I will be able to access the statistics (JSON data) of my shortened URL at:

localhost/admin?key=6974

  • Implementation with Apache NiFi

This use case gives me the opportunity to discuss about some nice features of Apache NiFi:

  1. The possibility to expose web services with the use of HandleHttpRequest and HandleHttpResponse processors in combination with a StandardHttpContextMap controller service.The controller service provides the ability to store and retrieve HTTP requests and responses external to a Processor, so that multiple processors can interact with the same HTTP request. In other words, the HandleHttpRequest processor initializes a Jetty web server listening for requests on a given port. Once a request is received, a FlowFile is generated with attributes and content (if any). This request has been received asynchronously so that the FlowFile can be used along a data flow before generating the response to send back to the user using the HandleHttpResponse processor.

    Note: at this moment, for a given listening port, there can be only one instance of the HandleHttpRequest processor on the canvas. Handling different services with the same processor can be performed with the addition of a RouteOnAttribute processor (we’ll see that in this implementation).

  2. The possibility to store information across the NiFi cluster (but also to have a distributed map cache of key/value data) using PutDistributedMapCache and FetchDistributedMapCache processors in combination with a DistributedMapCacheServer and DistributedMapCacheClientService controller services.The map cache server controller service provides a map (key/value) cache that can be accessed over a socket. Interaction with this service is typically accomplished via a DistributedMapCacheClient service. This feature is mainly used to share information across a NiFi cluster but can also be used at local level to store data in memory to be used along the flow.

    It has to be noted that, when using the PutDistributedMapCache processor, the key is given through the processor properties and the value is the content of the incoming FlowFile.

Let’s now describe the data flow I have created for the URL shortener service. First I have a single HandleHttpRequest processor with default configuration and a default StandardHttpContextMap controller service.

Then I use a RouteOnAttribute processor to define which URL has been accessed and to route the FlowFile (FF) accordingly. In the incoming FF, the attribute ‘http.request.uri’ contains the requested URL.

routeOnAttribute

At this point, I have three routes for my FF: one for the /shortlink requests, one for the /admin requests and one for the others.

Let’s start with the ‘admin’ flow part. When I receive a request at:

/admin?key=<key>

My incoming FF will have an attribute ‘http.query.param.key’ with the value <key>. This is useful to retrieve all the arguments passed along the URL.

I take the decision to use the map cache server as follow: the key will be the key value of my JSON data, and the value will be the JSON string itself. Consequently, I use a FetchDistributedMapCache processor to retrieve the JSON data associated to the given key.

fetchdistributedmapcache

If I don’t find anything, I use a ReplaceText processor to set an arbitrary error message as the content of my FF and then a HandleHttpResponse processor with the 500 HTTP error code. This way, the user will see the error message:

No URL found for key=6666

If I find an entry in my cache server, then the FF content is now filled in with my JSON string and I just need to route my FF to a HandleHttpResponse processor with a 200 HTTP code. This way, the user will see the JSON string with related information.

Let’s continue with the “shortlink” part of the flow. First of all I use a RouteOnAttribute processor to check that the URL provided at:

/shortlink?url=<URL>

is valid given a regular expression. If not I display an error message to the user with the combination of ReplaceText processor and HandleHttpResponse processor (as explained above).

If the URL is valid, I want to generate a key associated to this URL.

Note: for this part, I made the choice to keep it really simple and there are a lot of possible improvements/optimizations.

I make the decision that the key will be a 4-digits number and I create this number using time manipulation with the expression language. With an UpdateAttribute processor, I generate a ‘key’ attribute:

keyGeneration

Once the key is generated I use a FetchDistributedMapCache processor to check if this key is already used or not.

If yes, my FF now contains the associated JSON string. I use an EvaluateJsonPath processor to extract the creation date information from the JSON string and then I use a RouteOnAttribute processor to check if this creation date if below the threshold of 24 hours. If yes, I route my FF back to the UpdateAttribute processor to generate a new random key, if no, it means the key can be overwritten and I route my FF to the next steps.

Once I have a generated key that is free to use, I use a ReplaceText processor to construct my JSON string:

JSONconstruction

Then I store this information in the cache using a PutDistributedMapCache processor:

putdistributedmapcache

Note the “replace if present” property in case we are overwriting an already existing key that is too old. Then I just route my FF to a HandleHttpResponse processor with a 200 HTTP code to display to the user the JSON string.

As I said, this part is simple, there are a lot of possible improvements such as (but not limited to):

  • Have a text-based key with the possibility for the user to customize it in order to expand the number of possible shortened URL stored.
  • Use the cache server to store a sequence ID for the generated key in order to avoid randomness and possible loops in the flow.
  • Add the possibility to reuse the same key for two identical URLs to shorten.

Finally, let’s complete our use case with the last part of the flow: when a HTTP request is received which is not shortlink/admin.

The accessed URL is obtained using the FF attributes, and I can directly use a FetchDistributedMapCache processor:

fetchforredirection

If I don’t find any entry in my cache, I route the FF to a combination of ReplaceText and HandleHttpResponse processors to display an error message to the user with a 404 HTTP error code.

If I find a match, I use an EvaluateJsonPath processor to extract the counter value and the long URL from the JSON string retrieved in the cache. Then I first route my FF to a HandleHttpResponse processor with a 307 HTTP code (temporary redirection) and I add a HTTP header property to redirect the user to the corresponding URL:

httpresponse

Note: I use the 307 HTTP code to avoid my browser to cache the redirection and to perform the request each time I access my shortened URL.

I also route my FF to a ReplaceText processor in order to increment the count value in my JSON string and I use a PutDistributedMapCache processor to update the data in the cache.

That’s it! We now have a running URL shortener service with Apache NiFi. The flow is available as a template here. As always, feel free to comment and/or ask questions about this post.

Analyze Flickr user interests using Apache NiFi and Spark

Let’s have some fun with Apache NiFi by studying a new use case: analyze a Flickr account to get some information regarding what kind of pictures the owner likes.

Before getting started, have a look on my previous posts if it is the first time you hear about NiFi. Also let me remind you that Flickr is one website where people can share their pictures. In addition, users can add pictures of others users as “favorites” (similar to a like on Facebook) and add other users as “contacts” to get all their recently published photos on their personal feed.

OK, so now I assume you know a little about Flickr. The nice thing is that there is an API exposed by this website to exchange information with it. To use it you need to request an API key. Once done, you’ll have a key and secret passphrase. You will need this information to request Flickr through Apache NiFi.

Let’s move to the global idea of what we are going to do: for a given account user we are going to get all the photos published by this user, all the photos liked (favorites) by this user, and most recent photos published by users followed by our guy. Next we will get all tags and groups associated to each photo. We will aggregate all this data and perform a word count to get an idea of what our initial user is looking for when using Flickr. Does it sound interesting? OK, let’s go then.

First, if you want to have details about the Flickr API and all exposed methods, go on this page. In our case, we are going to only use methods that do not require authentication (we are not going to fetch data that requires the authorization of the concerned user). The methods are:

  • flickr.people.getPhotos : to get photos published by a given user
  • flickr.favorites.getList : to get favorite photos of a given user
  • flickr.photos.getContactsPublicPhotos : to get recently published photos by contacts of a given user
  • flickr.tags.getListPhoto : to get the list of tags for a given photo
  • flickr.photos.getAllContexts : to get the list of groups (or pools) for a given photo

Note: regarding the photos published by contacts, we are only going to fetch the last 50 photos published by each contact of our user. It would be possible to have a workaround and to get all the pictures of each contact, but this could represent millions of photos (and consequently millions of API calls) and it could potentially deform the results of the analysis since the number of published photos can vary a lot between users in terms of quantity but also in terms of frequency. So let’s focus on a sample of most recent pictures.

To “launch” our flow, I will start with a GenerateFlowFile processor to have only one Flow File (FF) created. Then I will use an UpdateAttribute processor to add attributes to my FF in order to store global properties that will be used along the flow:

  • User ID (or NSID) of the user account we want to analyze
  • API key to use
  • Page (set to 1) that will be used to handle pagination of answers
  • Format (set to json) to get JSON answers from the Flickr API

flickr_properties

Then I use InvokeHTTP processors to perform calls to the FlickrAPI. It will almost be the same for each processor of this kind, so I will only describe one. Let’s describe the processor used to get all favorite pictures of our user.

The only parameter I am going to change is the Remote URL. This parameter is accepting expression language so I can reference attributes of my FF. I set the Remote URL to:

https://api.flickr.com/services/rest/?method=flickr.favorites.getList&api_key=${apiKey}&user_id=${userId}&per_page=500&page=${page}&format=${responseFormat}

As you can see, I give the method of the API I want to request, and also the expected parameters for this method. In this case, the API key, the user ID, the number of elements by page returned by the API (500 is the maximum authorized), the page number I want to request and the returned format.

The result will look like:

jsonFlickrApi({"photos":{"page":1,"pages":10,"perpage":500,"total":"4559","photo":[{"id":"26164116671","owner":"100524190@N04","secret":"3f80e32734","server":"1653","farm":2,"title":"STAIRWAY TO HEAVEN","ispublic":1,"isfriend":0,"isfamily":0,"date_faved":"1459772801"}]},"stat":"ok"})

As you can see, the Flickr API returns a string which is not directly a ready-to-use JSON string. So I add an intermediary step where I remove “jsonFlickrApi(” at the beginning and “)” at the end of the FF content using two ReplaceText processors.

Then I want a system to handle the pagination aspect. For that I create a process group with one input port where I’ll send my FF, and with two output ports (I’ll come back to this very soon).

In this processor group, I first use an EvaluateJsonPath processor to extract “page” and “pages” from the JSON string and get the values as attributes of my FF (you notice that this will update my already existing attribute “page”).

evaluate_page

 

Then I use a RouteOnAttribute processor and I use the Expression language to check if the current page number (attribute “page”) is equal to the total number of pages (attribute “pages”).

page_equal_pages

Whether it is true or not, I route my FF to the first input port that will send my FF to the next step of the flow. But if false, I use an UpdateAttribute processor to increment the attribute “page” to get the new page number I must request:

increment

Note: I also update the “filename” attribute to ensure my FF have different filenames. It is useful when using PutFile processor along the flow for debug purposes.

Then I send this FF in the second output port which goes back to the InvokeHTTP processor but with a new page number. This way, it will loop over all pages to get all expected results.

The processor group looks like:

processor_group

I use the same logic for the photos published by the user and for the photos published by the user’s contacts. At this moment, I can use a funnel to regroup all my results : I am ready to continue at photo level. So far the flow looks like:

flow_first_part

From this point, I use a SplitJson processor to get individual photo information:

splitjson

Then I use an EvaluateJsonPath to extract the photo ID of each photo and to get it as attribute of my FF:

extractPhotoId

I am now able to call the two methods to get tags and groups associated to the picture. I use the following Remote URLs:

Then, I clean once more time the returned string to get a correct JSON string and I call again SplitJson processors to get each tag and group into individual FFs.

Splitting list of tags:

splittags

Splitting list of groups:

splitgroup

Then I use an EvaluateJsonPath to extract tag name and group name as my new FF content:

tagname

At the end, I use a MergeContent processor to concatenate my FFs into bigger FFs with a tag/group name by line. For this processor I use the following parameters:

mergecontent

Note: for the demarcator parameter, I set Shift+Enter for the carriage return.

This way, I have FF of 20k lines except if my incoming queue did not reach this limit in less than 5 minutes.

I end my flow with a PutFile processor to store the results in a given directory.

The second part of the flow looks like:

flow_snd_part

That’s it! I have completed all the flow!

I ran this flow for a given user, it represented about 100k photos with about 450k tag/group names to analyze. To do that, I used Apache Spark and the famous word count example to extract most used keywords.

Once pyspark launched, I used the following commands:

import re
data = sc.textFile("D:/tmp/*")
data.flatMap(lambda line: re.sub('[^0-9a-zA-Z ]+', '', line).lower().split())
    .map(lambda word: (word, 1))
    .reduceByKey(lambda a, b: a+b)
    .map(lambda (x,y): (y,x))
    .sortByKey(False)
    .take(200)

You can notice that I did some cleaning on the characters and changed my strings to lower case. Once I have the 200 most used words, I do some manual cleaning to remove all unwanted things like “flickr”, “photo”, “the”, etc. And I finally get the 20-most used words related to the pictures I got through my flow for my given user:

word_count_result

I hope you enjoyed going through this use case, and I am sure it will give you some ideas to have fun with Apache NiFi ! Please feel free to comment or to ask questions about this post.

Get data from/to Dropbox using Apache NiFi

Few days ago, on the mailing list, a question has been asked regarding the possibility to retrieve data from a smartphone using Apache NiFi. One suggestion was to use a cloud sharing service as an intermediary like Box, DropBox, Google Drive, AWS, etc.

Obviously, it already exists solutions to sync data from these services on your computer. But we could imagine using Apache NiFi to sync data between such services, or even better: to have an account on each of these services and use Apache NiFi as a single endpoint to use and manage all your accounts as a unique cloud storage facility. There are a lot of options out there.

For this post, let’s just use Apache NiFi to get data from/to a Dropbox account. I let you guys use your imagination to develop amazing things!

First, I need to allow my account being accessed by an external application. To do that, I go on this page and I click on the button “Create App”. Then I choose the “Dropbox API” and, in my case, I select the “App folder” type of access (only one directory will be accessible to my application). In the end I choose the name of my application “NIFI”. This will automatically create a folder Applications/NIFI in my Dropbox (this will be seen as the root directory of your Dropbox account from your application point of view: see it as “/”).

Now you need to generate an access token for your application. On the page of your application you have a link to generate the token. Once generated, keep it securely for yourself!

OK! We have everything we need! Just to be sure all is working, you can check using curl that you have the access:

curl https://api.dropbox.com/1/account/info -H "Authorization:Bearer <YOUR-ACCESS-TOKEN>"

Note: here is a post about the new Dropbox API.

Now let’s jump to Apache NiFi to have a flow to automatically download files from the Dropbox folder.

1. List content in Dropbox folder

This can be achieved using :

curl -X POST https://api.dropboxapi.com/2-beta-2/files/list_folder \
    --header "Authorization: Bearer <access-token>" \
    --header "Content-Type: application/json" \
    --data "{\"path\": \"\"}"

With NiFi, we have to use the InvokeHTTP processor. At the time of writing, it is necessary to use an incoming FlowFile to set the content to be sent with a POST request. I will shortly open an issue about that and, hopefully, it should be possible to directly set the body in NiFi 0.7.0.

OK, so to get the content of the request set to

{\"path\": \"\"}

a workaround is to use a local file on your computer and to get it with a GetFile processor.

In conclusion, to issue the request to list the content of the folder we need:

list_files

The GetFile processor has the following properties:

  • regarding the scheduling, I set it CRON-based with “*/5 * * * * ?” to issue the HTTP request every 5 seconds
  • regarding the properties:

get_file.PNG

You notice that I reference a specific file in D:\tmp. This file is named list_files.json and its content is:

{"path": ""}

Also, I configure the processor not to delete the file (Keep Source File = true).

Every 5 seconds, a FlowFile with the expected content will be sent to the InvokeHTTP processor which has the following properties:

list_invokehttp.png

I set the method to POST and set the URL endpoint as specified by Dropbox API. I manually set the Content-Type to “application/json” (the content will be set with the content of the incoming FlowFile). And I manually add a property with the key “Authorization” and the value to “Bearer <Access Token>” (to be changed with your access token). This will add the expected header property.

That is it! At this point we are able to list the content of our Dropbox folder. The output of the InvokeHTTP processor (in response relationship) will have a content looking like:

{"entries": 
[
{".tag": "file", "name": "file_3.txt", "path_lower": "/file_3.txt", "path_display": "/file_3.txt", "id": "id:EoMhAg5eUKAAAAAAAAAAAQ", "client_modified": "2016-03-11T12:35:31Z", "server_modified": "2016-03-11T12:35:31Z", "rev": "145c5a2f7", "size": 13}, 
{".tag": "file", "name": "file_1.txt", "path_lower": "/file_1.txt", "path_display": "/file_1.txt", "id": "id:qNPLoI0buzAAAAAAAAAAAg", "client_modified": "2016-03-11T12:35:32Z", "server_modified": "2016-03-11T12:35:32Z", "rev": "245c5a2f7", "size": 13}, 
{".tag": "file", "name": "file_2.txt", "path_lower": "/file_2.txt", "path_display": "/file_2.txt", "id": "id:ib7z-SMEqYAAAAAAAAAAAQ", "client_modified": "2016-03-11T12:35:33Z", "server_modified": "2016-03-11T12:35:33Z", "rev": "345c5a2f7", "size": 13}
], 
"cursor": "AAG4qAdoMOQVhshkMwFchOjPUnMWuGIvrQkZMb3L1aFa9euMXonxsG0S0_RIfxfFxYMxoz2cKFqc3laWcyDdM5MixrJ3AZ4jAyebx5s70k69z6KrBTE_IUh4Vnd2UZCUIAA", "has_more": false}

We can see the three files I have added in my Dropbox folder.

2. Prepare requests to download files

Let’s have a look at the request we have to send to download a file:

curl -X POST https://api.dropboxapi.com/2-beta-2/files/download \
    --header "Authorization: Bearer <access-token>" \
    --header "Dropbox-API-Arg: {\"path\": \"/cupcake.png\"}"

So, we have to split our FlowFile to have one by file we want to download. Let’s do that with a SplitJson processor with the following properties:

split_json

As we saw, we have our response with a JSON containing an array (named “entries”) with each of the files present in the Dropbox folder. This processor is designed to output a FlowFile by entry of a JSON array located at the given JSON Path. At this point, we will have a FlowFile by file we want to download. Each one will have as content something like:

{".tag": "file", "name": "file_3.txt", "path_lower": "/file_3.txt", "path_display": "/file_3.txt", "id": "id:EoMhAg5eUKAAAAAAAAAAAQ", "client_modified": "2016-03-11T12:35:31Z", "server_modified": "2016-03-11T12:35:31Z", "rev": "145c5a2f7", "size": 13}

Now let’s take advantage of the Expression Language of NiFi: we want to extract some values of this JSON content to add it in some attributes. For that, we use the EvaluateJsonPath processor with the following properties:

evaluate_json_path

In this case I extract the value associated to “path_lower” in the JSON to set it to an attribute with the key “path_lower”. For example, an output FlowFile will have an attribute with the key “path_lower” and the value “/file_3.txt”.

I also override the attribute “filename” (this attribute is already existing since this is a core attribute in NiFi). So far this attribute was containing “list_files.json” from my initial FlowFile at the very beginning of the flow. I change it with the name of the file I want to download (otherwise my files will be downloaded with the name “list_files.json”).

The last thing to do before being able to call a new InvokeHTTP processor is to clean the FlowFile content. This is due to the fact that the Dropbox API is not expecting any content in the request and will return an error if there is.

This is done with a ReplaceText processor with the following properties:

replace_text

Here, I just delete all the content of my FlowFile to only keep the attributes.

3. Download files

We are now ready to configure a last InvokeHTTP processor to download our files.

Note: the Dropbox API is expecting a POST request with no Content-Type otherwise it will return an error. It is kind of a strange behavior but no choice here… It was not possible, with the processor, to have a POST request without Content-Type, so I issued a JIRA and proposed a fix that should be made available in NiFi 0.6.0.

Here are the properties of the processor:

download_invokehttp

I set the method to POST and set the expected endpoint URL. I explicitly set the Content-Type as an empty string. And I manually add two properties for the header properties to add. One for authorization (to change with your access token) and one with the expected JSON (using NiFi Expression Language to get the attribute value of “path_lower” we got earlier) to specify the file to download.

The final step is to add a PutFile processor to put downloaded files into the wanted location. Here are the properties:

putfile

That’s all! We now have a full flow ready to download files from your Dropbox account!

download_flow.PNG

3. Upload files

From this point, it is easy to have the same logic and to upload files based on the needed request:

curl -X POST https://api.dropboxapi.com/2-beta-2/files/upload \
     --header "Authorization: Bearer <access-token>" \
     --header "Content-Type: application/octet-stream" \
     --header "Dropbox-API-Arg: {\"path\": \"/cupcake.png\", \"mode\": \"overwrite\"}" \
     --data-binary @local-file.png

We just need to link a similar GetFile processor (as described before) with a new InvokeHTTP processor with the following properties:

upload_file.png

This way, it will upload the incoming FlowFile with the correct file name at the root of your Dropbox directory (don’t forget to use your own Dropbox access token).

In conclusion, for upload, the flow is as simple as:

upload_flow.PNG

That is it! I think you have everything you need to enjoy NiFi if you want to play with it to handle content on your Dropbox account.

Here is the template of the full upload/download flow (don’t forget to update it with your access token).

Feel free to add comments and ask questions about this post!

Transform data with Apache NiFi

Few days ago, I just started to have a look into Apache NiFi which is now part of the Hortonworks Data Flow distribution (HDF). Based on my experience at Capgemini and the kind of projects into I have been involved, I immediately realized that it is a powerful system that can be used in a wide range of situations and problems.

Apache NiFi is an easy to use, powerful, and reliable system to process and distribute data. It supports powerful and scalable directed graphs of data routing, transformation, and system mediation logic. Some of the high-level capabilities and objectives of Apache NiFi include:

  • Web-based user interface
    • Seamless experience between design, control, feedback, and monitoring
  • Highly configurable
    • Loss tolerant vs guaranteed delivery
    • Low latency vs high throughput
    • Dynamic prioritization
    • Flow can be modified at runtime
    • Back pressure
  • Data Provenance
    • Track dataflow from beginning to end
  • Designed for extension
    • Build your own processors and more
    • Enables rapid development and effective testing
  • Secure
    • SSL, SSH, HTTPS, encrypted content, etc…
    • Pluggable role-based authentication/authorization

It must also be noted, that it is really easy to get started with NiFi (whatever the OS you are using): just download it, run it, and open your favorite web browser at localhost:8080.

Now you may tell yourself that it is just a tool to get data from a point A to a point B according to parameters, conditions, etc. In other words, you may think that it is just a tool to extract and load data. What are the transformation features offered by NiFi?

Just have a look to the list of 125+ processors available at this moment and you will have a good idea of what you can achieve. But let’s say you need to do some very specific actions on your data and you don’t see any processor suitable to your need. So what’s next?

First, you can write your own processor, it is very easy and straightforward (NiFi developer guide). Otherwise you can leverage some existing processors to execute custom code. For example you can achieve a lot of work with ExecuteScript and ExecuteStreamCommand processors. If you are interested by the first one, have a look on this blog to find useful and complete examples.

In this post, I want to focus on ExecuteStreamCommand and how it can be used to define data transformation flows. One common use case I see is to get files from one place and execute an application to transform the files.

For simplicity, let’s say I have an input directory in which files to be processed are coming. For each file A of this directory, I want to execute an application to transform this data into a new file B:

flow_AtoB

This can be easily achieved by the combination of two processors: ListFiles and ExecuteStreamCommand.

Here is an example running on Windows: I look for any new file in “D:\tmp-input” with a ListFiles processor using following properties:

listfiles

For each new file coming in this directory, the processor will generate a FlowFile (see NiFi documentation to learn about NiFi principles) with some useful attributes and no content.

Now, I have a batch file that I want to be executed on each file. This batch file takes exactly one parameter which is the path of the file to be processed.

My batch file is the following (command.bat) and is very simple, it moves the given file into another directory (obviously, this is just an example, if we just want to move files, NiFi can do that without executing commands!):

@echo off
MOVE %1 "D:\tmp" >nul

Then my ExecuteStreamCommand will have the following properties:

executeStreamCommand

Using the Expression Language provided with NiFi, I can extract the path of the file previously listed by ListFiles processor by extracting information from the FlowFile attributes.

${absolute.path}${filename}

This is the concatenation of attributes “absolute.path” and “filename” from the incoming FlowFile (attributes set by ListFiles processor).

The processor can send to the executed process the content of the incoming FlowFile, but in my case there is no content and I don’t want such a thing (Ignore STDIN = true). Besides, this processor can create a new FlowFile using the output of the command as content of the newly created FlowFile. But this is something I don’t want with this command, so I set the property “Output Destination Attribute” with the value “result”. This way, the output of my command is used as a new (or updated) attribute of my original FlowFile.

If you don’t need this FlowFile anymore, you can auto-terminate the processor, and you have a ready-to-go flow for processing files using your own command.

If you want another process to run on the newly created file (in D:\tmp), you can copy/paste this flow logic and use another ListFiles processor to scan the directory.

basic_flow.PNG

So far, it is a very basic operation. Something closer to real use case would be to create a flow where ExecuteStreamCommand processor gets information back from the executed command and passes it to the next processor.

Let’s transform my batch file to return the parameters to be passed to the next processor. In my example, my batch file moves the file given as first parameter to file path given in second parameter. At the end, it displays the new path of the file (second parameter) and the third parameter to be used in next processor (again, this is an example, in real world such parameters would probably be computed by the executed process itself).

@echo off
ECHO %* >> %1
MOVE %1 %2 >nul
ECHO "%2;%3"

Note: since I configured my processor to accept arguments delimited with “;” I display parameters to be passed with this delimiter. It is possible to change the delimiter in the processor properties (I recommend reading the usage documentation associated to each processor, this is very helpful).

Let’s have a look to the configuration of my first ExecuteStreamCommand processor:

first_execute

This will execute my batch file with:

  • 1st parameter:  path of my input file listed by ListFiles processor
  • 2nd parameter: destination path of my file
  • 3rd parameter: parameter to be passed to next processor

My batch file, once called, will move my file to “D:\step1.txt” and will display:

“D:\\step1.txt;D:\\step2.txt”

This will be set at the value of the attribute with “result” as key in the generated FlowFile.

The second ExecuteStreamCommand processor has the following configuration:

second_execute

Here, I use Expression Language functions (replaceAll) to remove special characters introduced in the process (it is possible to adapt the batch script to avoid this operation).

 

This will execute my batch file with:

  • 1st parameter:  D:\\step1.txt
  • 2nd parameter: D:\\step2.txt
  • 3rd parameter: null

My batch file, once called, will move my file to “D:\step2.txt” and will display:

“D:\\step2.txt”

In conclusion, by taking advantage of the Expression Language and by controlling the output of the executed commands, I can use the FlowFile initially created by ListFiles processors to carry information along the flow and propagate information and arguments to following steps.

For a better understanding, here is a picture of how the flow file has evolved through the flow:

flow.PNG

Between the ListFiles processor and the first ExecuteStreamCommand processor, the flow file has the following attributes :

flow_file_1

Then between the two ExecuteStreamCommand processors:

flow_file_2

And at the end of the flow (if we want to add other processors):

flow_file_3

As a side note, we can see that the return code of the executed command is available in the FlowFile with the attribute “execution.status”. It easily allows to route FlowFile depending of what is the return code using RouteOnAttribute processor. Thus, it is possible to implement “catch” / “retry” strategies in case of error.

Please feel free to comment/ask questions about this post!