List/Fetch pattern and Remote Process Group in Apache NiFi

I do see a lot of questions about how is working the List[X]/Fetch[X] processors and how to load balance the data over the nodes of a NiFi cluster once the data is already in the cluster. Since the question comes up quite often, let’s discuss the subject and let’s try to understand how things are working here.

I will assume that you are running a NiFi cluster since there is no problem about data balancing with a standalone instance ūüėČ

The first thing to understand is: when running a cluster, one of the node is randomly designated as the “Primary node”. The election takes place when the cluster starts, and there is no way to decide which node will be the primary node. OK… you could force things when your cluster starts but there is no point to do such a thing if you want real high availability. So short line is: all nodes may have to be the Primary node at one point and don’t assume that the Primary node will be a given node in particular.

Second thing to understand is: the flow that you are designing on your canvas is running on all the nodes independently. Each node of the cluster is responsible of its own data and a relationship between two processors does not mean that the data going into this relationship will be balanced over the nodes. Unless you use a Remote Process Group (see below) the data will remain on the same node from the beginning to the end of the flow.

I will use a the following example to illustrate my explanations: I want to get files from a remote SFTP server and put the files into HDFS.

  • First idea (bad idea!) / GetSFTP -> PutHDFS

Screen Shot 2017-02-23 at 11.04.42 AM.png

The first option could be the pattern Get/Put which is perfectly fine with a standalone instance. However this will cause issues if you have a NiFi cluster. Remember? The flow is running on all hosts of your cluster. Problem is that you will have concurrent accesses from your nodes to the same files on the SFTP server… and if the processor is configured to delete the file once retrieved (default behavior) you will have errors showing up. Conclusion: always have in mind that a processor is running on all the nodes and can cause concurrent access errors depending on the remote system.

  • Second idea (not efficient!) / GetSFTP on Primary Node -> PutHDFS

The second option is to configure the GetSFTP processor to only run on the Primary Node (in the Scheduling tab of the processor configuration):

Screen Shot 2017-02-23 at 11.10.12 AM.png

This way, you will solve the concurrent accesses since only one node of your cluster (the Primary node) will run the GetSFTP processor.

Brief aside: remember, the flow is running on all the nodes, however if the processor is configured to run on the primary node only, the processor won’t be scheduled on nodes not being the primary node. That’s all.

With this approach the problem is that it’s not efficient at all. First reason is that you get data from only one node (this does not scale at all), and, in the end, only the primary node of your cluster is actually handling the data. Why? Because, unless you explicitly use a remote process group, the data will remain on the same node from the beginning to the end. In this case, only the primary node will actually get data from SFTP server and push it into HDFS.

  • Recommended pattern : ListSFTP -> RPG / Input Port -> FetchSFTP -> PutHDFS

To solve the issues, the List/Fetch pattern has been developed and widely used for a lot of processors. The idea is the following: the List processor will only list the data to retrieve on the remote system and get the associated metadata (it will not get the data itself). For each listed item, a flow file (with no content) will be generated and attributes will be populated with the metadata. Then the flow file is sent to the Fetch processor to actually retrieved the data from the remote system based on the metadata (it can be the path of the file on the remote system for example). Since each flow file contains the metadata of a specific item on the remote system, you won’t have concurrent accesses even if you have multiple Fetch processors running in parallel.

Obviously the List processor is meant to be run on the Primary node only. Then you have to balance the generated flow files over the nodes so that the Fetch processor on each node is dealing with flow files. For this purpose you have to use a Remote Process Group.

A Remote Process Group is an abstract object used to connect two NiFi setup together (the communication between the two NiFi is what we call Site-to-Site or S2S). It can be a MiNiFi instance to a NiFi cluster, a NiFi cluster to another NiFi cluster, a NiFi standalone to a NiFi cluster, etc. And it can also be used to connect a NiFi cluster to itself! This way the flow files will be balanced over all the nodes of the cluster. Few things to know with a Remote Process Group:

  1. You need to have an input port on the remote instance you are connecting to (in our case, you need a remote input port on your canvas).
  2. The address you give when configuring your remote process group does not matter in terms of high availability: once the connection is established with one of the nodes of the remote instance, the remote process group will be aware of all the nodes of the remote instance and will manage the case where the node specified in the address goes down.
  3. Your instances need to be configured to allow remote access. The required properties are:

# Site to Site properties
nifi.remote.input.host=
nifi.remote.input.secure=
nifi.remote.input.socket.port=
nifi.remote.input.http.enabled=
nifi.remote.input.http.transaction.ttl=

In the case of our SFTP example, it looks like:

Screen Shot 2017-02-23 at 11.47.40 AM.png

Let’s try to understand what is going on from a cluster perspective. Here is what we have in the case of a 3-nodes NiFi cluster with ListSFTP running on the primary node only:

Screen Shot 2017-02-23 at 12.03.22 PM.png

The ListSFTP when scheduled is going to list the three files on my remote SFTP server and will generate one flow file for each remote file. Each flow file won’t have any content but will have attributes with metadata of the remote files. In the case of ListSFTP, I’ll have (check the documentation at the “Write attributes” paragraph):

Name Description
sftp.remote.host The hostname of the SFTP Server
sftp.remote.port The port that was connected to on the SFTP Server
sftp.listing.user The username of the user that performed the SFTP Listing
file.owner The numeric owner id of the source file
file.group The numeric group id of the source file
file.permissions The read/write/execute permissions of the source file
file.lastModifiedTime The timestamp of when the file in the filesystem waslast modified as ‘yyyy-MM-dd’T’HH:mm:ssZ’
filename The name of the file on the SFTP Server
path The fully qualified name of the directory on the SFTP Server from which the file was pulled

The ListSFTP processor will generate 3 flow files and, for now, all flow files are only on the primary node:

Screen Shot 2017-02-23 at 1.59.43 PM.png

Now the Remote Process Group has been configured to connect to the cluster itself, and I set the relationship going from ListSFTP to the Remote Process Group to connect with the input port I created (you may have multiple input ports in the remote system to connect with and you can choose the input port to connect to, that’s up to your needs). When the RPG (Remote Process Group) has the communication enabled, the RPG is aware of the three nodes and will balance the data to each remote node (be aware that there is a lot of parameters for Site-to-Site to improve efficiency). In my case that would give something like:

Screen Shot 2017-02-23 at 1.59.55 PM.png

Note: that would be an ideal case in terms of balancing but, for efficiency purpose, the Site-to-Site mechanism might send batch of flow files to the remote node. In the above example, with only 3 flow files, I would probably not end up with one flow file per node.

Now, since we have everything in the attributes of our flow files, we need to use the Expression Language to set the properties of the FetchSFTP processor to use the attributes of the incoming flow files:

screen-shot-2017-02-23-at-1-55-13-pm

This way, each instance of the FetchSFTP processor will take care of its own file (to actually fetch the content of the remote data) and there won’t be any concurrent access:

Screen Shot 2017-02-23 at 2.00.12 PM.png

All your nodes are retrieving data and you really can scale up your cluster depending on your requirements. Note also that the PutHDFS won’t be an issue neither since each node will write its own file.

As I said previously a lot of processors are embracing this pattern (and this is recommended way to use such processors with a NiFi cluster), and I’d strongly encourage you to do the same when developing your custom processors.

As always questions/comments are welcome.

Apache NiFi 1.0.0 – Cluster setup

As you may know a version 1.0.0-BETA of Apache NiFi has been released few days ago. The upcoming 1.0.0 release will be a great moment for the community as it it will mark a lot of work over the last few months with many new features being added.

The objective of the Beta release is to give people a chance to try this new version and to give a feedback before the official major release which will come shortly. If you want to preview this new version with a completely new look, you can download the binaries here, unzip it, and run it (‘./bin/nifi.sh start‘ or ‘./bin/run-nifi.bat‘ for Windows), then you just have to access http://localhost:8080/nifi/.

The objective of this post is to briefly explain how to setup an unsecured NiFi cluster with this new version (a post for setting up a secured cluster will come shortly with explanations on how to use a new tool that will be shipped with NiFi to ease the installation of a secured cluster).

One really important change with this new version is the new paradigm around cluster installation. From the NiFi documentation, we can read:

Starting with the NiFi 1.0 release, NiFi employs a Zero-Master Clustering paradigm. Each of the nodes in a NiFi cluster performs the same tasks on the data but each operates on a different set of data. Apache ZooKeeper elects one of the nodes as the Cluster Coordinator, and failover is handled automatically by ZooKeeper. All cluster nodes report heartbeat and status information to the Cluster Coordinator. The Cluster Coordinator is responsible for disconnecting and connecting nodes. As a DataFlow manager, you can interact with the NiFi cluster through the UI of any node in the cluster. Any change you make is replicated to all nodes in the cluster, allowing for multiple entry points to the cluster.

zero-master-cluster

OK, let’s start with the installation. As you may know it is greatly recommended to use an odd number of ZooKeeper instances with at least 3 nodes (to maintain a majority also called quorum). NiFi comes with an embedded instance of ZooKeeper, but you are free to use an existing cluster of ZooKeeper instances if you want. In this article, we will use the embedded ZooKeeper option.

I will use my computer as the first instance. I also launched two virtual machines (with a minimal Centos 7). All my 3 instances are able to communicate to each other on requested ports. On each machine, I configure my /etc/hosts file with:

192.168.1.17 node-3
192.168.56.101 node-2
192.168.56.102 node-1

I deploy the binaries file on my three instances and unzip it. I now have a NiFi directory on each one of my nodes.

The first thing is to configure the list of the ZK (ZooKeeper) instances in the configuration file ‘./conf/zookeep.properties‘. Since our three NiFi instances will run the embedded ZK instance, I just have to complete the file with the following properties:

server.1=node-1:2888:3888
server.2=node-2:2888:3888
server.3=node-3:2888:3888

Then, everything happens in the ‘./conf/nifi.properties‘. First, I specify that NiFi must run an embedded ZK instance, with the following property:

nifi.state.management.embedded.zookeeper.start=true

I also specify the ZK connect string:

nifi.zookeeper.connect.string=node-1:2181,node-2:2181,node-3:2181

As you can notice, the ./conf/zookeeper.properties file has a property named dataDir. By default, this value is set to ./state/zookeeper. If more than one NiFi node is running an embedded ZK, it is important to tell the server which one it is.

To do that, you need to create a file name myid and placing it in ZK’s data directory. The content of this file should be the index of the server as previously specify by the server. property.

On node-1, I’ll do:

mkdir ./state
mkdir ./state/zookeeper
echo 1 > ./state/zookeeper/myid

The same operation needs to be done on each node (don’t forget to change the ID).

If you don’t do this, you may see the following kind of exceptions in the logs:

Caused by: java.lang.IllegalArgumentException: ./state/zookeeper/myid file is missing

Then we go to clustering properties. For this article, we are setting up an unsecured cluster, so we must keep:

nifi.cluster.protocol.is.secure=false

Then, we have the following properties:

nifi.cluster.is.node=true
nifi.cluster.node.address=node-1
nifi.cluster.node.protocol.port=9999
nifi.cluster.node.protocol.threads=10
nifi.cluster.node.event.history.size=25
nifi.cluster.node.connection.timeout=5 sec
nifi.cluster.node.read.timeout=5 sec
nifi.cluster.firewall.file=

I set the FQDN of the node I am configuring, and I choose the arbitrary 9999 port for the communication with the elected cluster coordinator. I apply the same configuration on my other nodes:

nifi.cluster.is.node=true
nifi.cluster.node.address=node-2
nifi.cluster.node.protocol.port=9999
nifi.cluster.node.protocol.threads=10
nifi.cluster.node.event.history.size=25
nifi.cluster.node.connection.timeout=5 sec
nifi.cluster.node.read.timeout=5 sec
nifi.cluster.firewall.file=

and

nifi.cluster.is.node=true
nifi.cluster.node.address=node-3
nifi.cluster.node.protocol.port=9999
nifi.cluster.node.protocol.threads=10
nifi.cluster.node.event.history.size=25
nifi.cluster.node.connection.timeout=5 sec
nifi.cluster.node.read.timeout=5 sec
nifi.cluster.firewall.file=

We have configured the exchanges between the nodes and the cluster coordinator, now let’s move to the exchanges between the nodes (to balance the data of the flows). We have the following properties:

nifi.remote.input.host=node-1
nifi.remote.input.secure=false
nifi.remote.input.socket.port=9998
nifi.remote.input.http.enabled=true
nifi.remote.input.http.transaction.ttl=30 sec

Again, I set the FQDN of the node I am configuring and I choose the arbitrary 9998 port for the Site-to-Site (S2S) exchanges between the nodes of my cluster. The same applies for all the nodes (just change the host property with the correct FQDN).

It is also important to set the FQDN for the web server property, otherwise we may get strange behaviors with all nodes identified as ‘localhost’ in the UI. Consequently, for each node, set the following property with the correct FQDN:

nifi.web.http.host=node-1

And that’s all! Easy, isn’t it?

OK, let’s start our nodes and let’s tail the logs to see what’s going on there!

./bin/nifi.sh start && tail -f ./logs/nifi-app.log

If you look at the logs, you should see that one of the node gets elected as the cluster coordinator and then you should see heartbeats created by the three nodes and sent to the cluster coordinator every 5 seconds.

You can connect to the UI using the node you want (you can have multiple users connected to different nodes, modifications will be applied on each node). Let’s go to:

http://node-2:8080/nifi

Here is what it looks like:

Screen Shot 2016-08-13 at 7.33.08 PM

As you can see in the top-left corner, there are 3 nodes in our cluster. Besides, if we go in the menu (button in the top-right corner) and select the cluster page, we have details on our three nodes:

Screen Shot 2016-08-13 at 7.35.28 PM

We see that my node-2 has been elected as cluster coordinator, and that my node-3 is my primary node. This distinction is important because some processors must run on a unique node (for data consistency) and in this case we will want it to run “On primary node” (example below).

We can display details on a specific node (“information” icon on the left):

Screen Shot 2016-08-13 at 7.35.48 PM

OK, let’s add a processor like GetTwitter. Since the flow will run on all nodes (with balanced data between the nodes), this processor must run on a unique processor if we don’t want to duplicate data. Then, in the scheduling strategy, we will choose the strategy “On primary node”. This way, we don’t duplicate data, and if the primary node changes (because my node dies or gets disconnected), we won’t loose data, the workflow will still be executed.

Screen Shot 2016-08-13 at 7.45.19 PM

Then I can connect my processor to a PutFile processor to save the tweets in JSON by setting a local directory (/tmp/twitter):

Screen Shot 2016-08-13 at 7.52.25 PM

If I run this flow, all my JSON tweets will be stored on the primary node, the data won’t be balanced. To balance the data, I need to use a RPG (Remote Process Group), the RPG will connect to the coordinator to evaluate the load of each node and balance the data over the nodes. It gives us the following flow:

Screen Shot 2016-08-13 at 8.00.26 PM

I have added an input port called “RPG”, then I have added a Remote Process Group that I connected to ” http://node-2:8080/nifi ” and I enabled transmission so that the Remote Process Group was aware of the existing input ports on my cluster. Then in the Remote Process Group configuration, I enabled the RPG input port. I then connected my GetTwitter to the Remote Process Group and selected the RPG input port. Finally, I connected my RPG input port to my PutFile processor.

When running the flow, I now have balanced data all over my nodes (I can check in the local directory ‘/tmp/twitter‘ on each node).

That’s all for this post. I hope you enjoyed it and that it will be helpful for you if setting up a NiFi cluster. All comments/remarks are very welcomed and I kindly encourage you to download Apache NiFi, to try it and to give a feedback to the community if you have any.

Apache NiFi – MiNiFi is (almost) out!

This is quite a busy period for Apache NiFi community: Apache NiFi 0.7.0 is about to be released (RC2 is coming), Apache NiFi 1.0.0 will probably be ready in the next few weeks (stay tuned) and… Apache MiNiFi 0.0.1 will officially be released next week (RC vote in progress and so far so good)! This a great step for the community and I wanted to write a quick article about this new amazing tool!

Here is my step-by-step article to get hands on MiNiFi and play with it. But first of all, if you want to know more about this great subproject, have a look here: https://nifi.apache.org/minifi/index.html

  • Build MiNiFi from sources

I downloaded the sources from this link: https://dist.apache.org/repos/dist/dev/nifi/minifi-0.0.1/minifi-0.0.1-source-release.zip

And built the sources using maven: mvn clean install.

There are two convenience binaries generated as part of this process.  The
MiNiFi assembly and a MiNiFi Toolkit assembly.

  • Convert and validate MiNiFi templates

The toolkit can be used to convert a NiFi template (XML) into a MiNiFi template (YAML) or to validate a MiNiFi template.

For this process, I created a very simple template using  NiFi. This template is made of a GenerateFlowFile processor to generate flow files with no content every 5 seconds, and an AttributeToJson processor to extract attributes of the flow file and generate a JSON in the flow file content. Then it is connected to a Remote Process Group pointing to a local running NiFi instance configured to allow Site-to-Site communication.

minifi-flow

I then saved this template as a XML file (NiFi template).

If I look at the toolkit usage (in a Windows environment):

minifi-toolkit-0.0.1\bin> .\config.bat
Usage:

java org.apache.nifi.minifi.toolkit.configuration.ConfigMain <command> options

Valid commands include:
transform: Transform template xml into MiNiFi config YAML
validate: Validate config YAML

I now use the toolkit to convert my template:

.\config.bat transform MiNiFi-test.xml MiNiFi-test.yml

And I can also validate the generated configuration file:

.\config.bat validate MiNiFi-test.yml

Note: here is the working configuration I then used in MiNiFi.

  • Run MiNiFi

I now take the generated configuration and use it to replace the default one:

minifi-0.0.1\conf\config.yml

And I now start MiNiFi:

minifi-0.0.1\bin\run-minifi.bat

By looking at the logs (minifi-0.0.1\logs\minifi-app.log), we can see all the processors that are shipped with MiNiFi. Here is a list of the currently standard processors included with MiNiFi:

    org.apache.nifi.processors.standard.PostHTTP || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.RouteOnContent || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.FetchFile || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.EvaluateXPath || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.SplitContent || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.ListSFTP || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.ReplaceText || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.MergeContent || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.ConvertCharacterSet || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.PutDistributedMapCache || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.HandleHttpRequest || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.PutSyslog || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.CompressContent || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.ParseSyslog || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.GetFile || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.RouteOnAttribute || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.ModifyBytes || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.ControlRate || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.HashAttribute || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.Base64EncodeContent || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.TailFile || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.GetHTTP || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.HashContent || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.EvaluateXQuery || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.IdentifyMimeType || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.GetJMSQueue || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.ListenTCP || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.GetFTP || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.FetchDistributedMapCache || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.PutJMS || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.SplitXml || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.EvaluateRegularExpression || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.ListenSyslog || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.ScanContent || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.ConvertJSONToSQL || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.EncryptContent || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.FetchSFTP || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.attributes.UpdateAttribute || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-update-attribute-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.GetJMSTopic || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.ReplaceTextWithMapping || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.SplitJson || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.ListFile || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.TransformXml || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.EvaluateJsonPath || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.ExecuteProcess || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.MonitorActivity || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.ValidateXml || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.ExecuteSQL || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.SegmentContent || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.PutSFTP || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.ExecuteStreamCommand || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.LogAttribute || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.DistributeLoad || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.GenerateFlowFile || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.ListenHTTP || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.ListenUDP || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.PutSQL || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.PutFile || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.PutFTP || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.RouteText || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.ListenRELP || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.InvokeHTTP || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.ExtractText || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.UnpackContent || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.AttributesToJSON || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.PutEmail || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.DetectDuplicate || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.ScanAttribute || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.SplitText || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.GetSFTP || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.HandleHttpResponse || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.DuplicateFlowFile || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.QueryDatabaseTable || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]

As you can see, we can, with a default installation, do a lot of amazing things!!!

Once MiNiFi is started, the configured flow will be automatically started, and I am now able to see in my running NiFi instance all the files I received from the MiNiFi instance through Site-to-Site communication into the input port I configured:

nifi-with-minifi

For the purpose of the demonstration, the above screenshot shows all the processors involved in this demonstration:

  • I used the red-circled elements to generate a NiFi template, convert it into a MiNiFi template and get it running into a MiNiFi 0.0.1 instance.
  • The green-circled part is the only part really running on my NiFi instance and it is receiving all the flow files generated by the MiNiFi instance.

That’s all for this quick demonstration of this new tool. Hope you enjoyed it and that you will give it a try!

  • Conclusion

MiNiFi is a very lightweight tool (40Mo !) that can be easily deployed on a lot of remote machines/servers as a single running instance to collect and quickly process data or to be a remote relay to communicate with NiFi. Perspectives of the role of MiNiFi should be from the perspective of the agent acting immediately at, or directly adjacent to, source sensors, systems, or servers.

It is really easy to use and deploy and will, hopefully, be widely used in the Internet of Things! Besides, it comes with the top level security features coming with NiFi! A must!

US presidential election via Twitter using Apache NiFi, Spark, Hive and Zeppelin

This article describes a frequency and sentiment analysis based on real-time tweets streams in relation to the four main candidates in the US Presidential Election.

The main objective was to deploy and to test the available connector between Apache NiFi and Apache Spark, so I decided to implement the following use case:

At the end, I get real time analytics such as:

  • frequency of tweets along the time per candidate
  • percentage of negative, positive and neutral tweets per candidate
  • opinion trends along the time for each candidate

The article is available on Hortonworks Community Connection website. And as always, please feel free to comment and/or ask questions.

OAuth 1.0A with Apache NiFi (Twitter API example)

A lot of API are using OAuth  protocol to authorize the received requests and to check if everything is OK regarding the identity of the request sender.

OAuth is an open standard for authorization, commonly used as a way for Internet users to log into third party websites using their Microsoft, Google, Facebook, Twitter, One Network etc. accounts without exposing their password. Generally, OAuth provides to clients a “secure delegated access” to server resources on behalf of a resource owner. It specifies a process for resource owners to authorize third-party access to their server resources without sharing their credentials. Designed specifically to work with Hypertext Transfer Protocol (HTTP), OAuth essentially allows access tokens to be issued to third-party clients by an authorization server, with the approval of the resource owner. The third party then uses the access token to access the protected resources hosted by the resource server.

As a remark, there are two versions of the protocol currently used out there: 1.0A and 2.0. As far as I know, 1.0A is the most commonly used. I already faced the need to use OAuth 1.0A protocol with the Flickr API but, back then, I found a way to get my data differently.

Recently, a question was asked on the Hortonworks Community Connection regarding the use of Apache NiFi to get data from Twitter API using OAuth 1.0A protocol. So this time, I decided to have a look on this and to get the job done.

This post presents the flow I used to perform a request against Twitter API using OAuth protocol. It gives me the opportunity to use for the first time the ExecuteScript processor which allows user to execute custom scripts on the fly inside NiFi (you will find a lot of examples on this great site).

Note 1: this was the first time I used Groovy language, be nice with me!

Note 2: I didn’t test the flow on a lot of methods. Some modifications may be necessary for some cases.

OK. The objective was to request the “users/lookup” method of the Twitter API. You can read the documentation here.

I want to perform a HTTP GET on:

https://api.twitter.com/1.1/users/lookup.json?screen_name=twitterapi,twitter

So far it seems really easy to do with a simple InvokeHTTP processor. The thing is you need to identify yourself when sending the request. Here comes the OAuth protocol. The official specification for 1.0A can be read here. But in the case of the Twitter API, you have a nice documentation here.

Besides on the documentation of each method, you have an OAuth Signature Generator that can be accessed (if you have defined a Twitter App). The generator is here. It lets you play around and gives great insights on each request to debug your own implementation of OAuth protocol.

The global idea is: you have a lot of input parameters and you must follow the specifications to construct a string based on the parameters. This string will be the value associated to “Authorization” key in HTTP header properties.

Here is the list of the needed parameters. First the parameters directly linked to your request:

Then the global parameters related to OAuth:

  • Consumer key (private information of your app provided by Twitter)
  • Consumer secret (private information of your app provided by Twitter)
  • Nonce (random string, uniquely generated for each request)
  • Signature method (with Twitter it is HMAC-SHA1)
  • Timestamp (in seconds)
  • Token (private information of your app provided by Twitter)
  • Token secret (private information of your app provided by Twitter)
  • Version (in this case 1.0)

The first step is to construct the “signature base string“. For that you first need to create the “parameter string“. All is very well explained here. Once you have the signature base string, you can encode it using HMAC-SHA1 and you easily get the header property to set in your HTTP request:

Authorization: OAuth oauth_consumer_key="*******", oauth_nonce="a9ab2392e5158a4c4e029c7829164571", oauth_signature="4s4Hi5hQ%2FoLKprW7qsRlImds3Og%3D", oauth_signature_method="HMAC-SHA1", oauth_timestamp="1460453975", oauth_token="*******", oauth_version="1.0"

Let’s get into the details using Apache NiFi. Here is the flow I created:

oauthFlow

I use a GenerateFlowFile to generate an empty Flow File (FF) in order to execute my flow. Then I use an UpdateAttribute processor to add attributes to my FF. In this case, I only add the parameters related to the specific request I want to execute:

globalParam

Then I send my FF into a process group that will compute the header property to set (I will come back to this part later). Then I perform my request using the InvokeHTTP processor:

invokeHTTP

I set the method to GET, the URL to my corresponding FF attribute, the content type to text/plain and I add a custom property named Authorization with the FF attribute I created in my process group (see below). This custom property will be added as a HTTP header in the request. At the end, I use a PutFile processor to write the result of my request in a local directory.

Let’s go to the interesting part of our flow where all the magic is: the process group I named OAuth 1.0A. Here it is:

processGroup

I just use two processors. The first one is an UpdateAttribute to add all the parameters I need as attributes of my FF. the second one is an ExecuteScript processor where I put my groovy code to compute the header property.

First… the parameters:

oauthParameters

Note: I use Expression Language provided by NiFi for some attributes.

  • arguments is used to extract the argument part in my target URL. In this example: screen_name=twitterapi,twitter
  • base_url is the URL I request without any argument. In this example: https://api.twitter.com/1.1/users/lookup.json
  • For the nonce parameter I use the “UUID” method of the expression language which generated a random string and I just take to replace the ‘-‘ characters to only keep an alphanumeric string.
  • For the timestamp, I use the “now” method of the expression language and I use substring to only keep seconds.

Let’s move to the ExecuteScript part. I set the script engine to Groovy and I put my script code in the “script body” property. The full code is available at the end of the post. Let’s go through it piece by piece.

First thing, I want to trigger my code only when a FF is available:

def flowFile = session.get()
if (!flowFile) return

Then I define a method I will use for the HMAC-SHA1 encoding:

def static hmac(String data, String key) throws java.security.SignatureException
{
    String result
    try {
        // get an hmac_sha1 key from the raw key bytes
        SecretKeySpec signingKey = new SecretKeySpec(key.getBytes(), "HmacSHA1");
        // get an hmac_sha1 Mac instance and initialize with the signing key
        Mac mac = Mac.getInstance("HmacSHA1");
        mac.init(signingKey);
        // compute the hmac on input data bytes
        byte[] rawHmac = mac.doFinal(data.getBytes());
        result= rawHmac.encodeBase64()
    } catch (Exception e) {
        throw new SignatureException("Failed to generate HMAC : " + e.getMessage());
    }
    return result
}

For this part, I will need to add some imports at the beginning of my script body:

import java.security.SignatureException
import javax.crypto.Mac
import javax.crypto.spec.SecretKeySpec

Then I retrieve all the attributes of my FF and I extract some attributes I don’t need to construct my parameter string:

def attributes = flowFile.getAttributes()
// retrieve arguments of the target and split arguments
def arguments = attributes.arguments.tokenize('&')
def method = attributes.method
def base_url = attributes.base_url
def consumerSecret = attributes.oauth_consumer_secret
def tokenSecret = attributes.oauth_token_secret

Then I create a TreeMap in which I add all the parameters I need to construct my parameter string. A TreeMap ensures me that it is sorted on keys in alphabetical order.

TreeMap map = [:]

for (String item : arguments) {
        def (key, value) = item.tokenize('=')
        map.put(key, value)
}

map.put("oauth_consumer_key", attributes.oauth_consumer_key)
map.put("oauth_nonce", attributes.oauth_nonce)
map.put("oauth_signature_method", attributes.oauth_signature_method)
map.put("oauth_timestamp", attributes.oauth_timestamp)
map.put("oauth_token", attributes.oauth_token)
map.put("oauth_version", attributes.oauth_version)

Then I add a method to the String class to allow percent encoding on String objects:

String.metaClass.encode = {
    java.net.URLEncoder.encode(delegate, "UTF-8").replace("+", "%20").replace("*", "%2A").replace("%7E", "~");
}

I am now able to construct the parameter string:

String parameterString = ""

map.each { key, value ->
    parameterString += key.encode()
    parameterString += '='
    parameterString += value.encode()
    parameterString += '&'
}

parameterString = parameterString.substring(0, parameterString.length()-1);

Update: the code above can be simplified as below (see Andy’s comment)

String parameterString = map.collect { String key, String value ->
    "${key.encode()}=${value.encode()}"
}.join("&")

It is now possible to get the signature:

String signatureBaseString = ""
signatureBaseString += method.toUpperCase()
signatureBaseString += '&'
signatureBaseString += base_url.encode()
signatureBaseString += '&'
signatureBaseString += parameterString.encode()

String signingKey = consumerSecret.encode() + '&' + tokenSecret.encode()
String oauthSignature = hmac(signatureBaseString, signingKey)

I may add this information as a new attribute of my FF:

flowFile = session.putAttribute(flowFile, 'oauth_signature', oauthSignature)

Then I can construct the header property value to associate to Authorization key:

String oauth = 'OAuth '
oauth += 'oauth_consumer_key="'
oauth += attributes.oauth_consumer_key.encode()
oauth += '", '
oauth += 'oauth_nonce="'
oauth += attributes.oauth_nonce.encode()
oauth += '", '
oauth += 'oauth_signature="'
oauth += oauthSignature.encode()
oauth += '", '
oauth += 'oauth_signature_method="'
oauth += attributes.oauth_signature_method.encode()
oauth += '", '
oauth += 'oauth_timestamp="'
oauth += attributes.oauth_timestamp.encode()
oauth += '", '
oauth += 'oauth_token="'
oauth += attributes.oauth_token.encode()
oauth += '", '
oauth += 'oauth_version="'
oauth += attributes.oauth_version.encode()
oauth += '"'

I add this information as an attribute (that will be used in the InvokeHTTP processor as we saw before) and I forward my FF to the success relationship:

flowFile = session.putAttribute(flowFile, 'oauth_header', oauth)
session.transfer(flowFile, REL_SUCCESS)

That’s it: I have an operational flow implementing OAuth 1.0A protocol to request against the Twitter API.

The full code is available here as a gist.
The NiFi template is available here.

As always, feel free to ask questions and comment this post!

 

Analyze Flickr user interests using Apache NiFi and Spark

Let’s have some fun with Apache NiFi by studying a new use case: analyze a Flickr account to get some information regarding what kind of pictures the owner likes.

Before getting started, have a look on my previous posts if it is the first time you hear about NiFi. Also let me remind you that Flickr is one website where people can share their pictures. In addition, users can add pictures of others users as “favorites” (similar to a like on Facebook) and add other users as “contacts” to get all their recently published photos on their personal feed.

OK, so now I assume you know a little about Flickr. The nice thing is that there is an API exposed by this website to exchange information with it. To use it you need to request an API key. Once done, you’ll have a key and secret passphrase. You will need this information to request Flickr through Apache NiFi.

Let’s move to the global idea of what we are going to do: for a given account user we are going to get all the photos published by this user, all the photos liked (favorites) by this user, and most recent photos published by users followed by our guy. Next we will get all tags and groups associated to each photo. We will aggregate all this data and perform a word count to get an idea of what our initial user is looking for when using Flickr. Does it sound interesting? OK, let’s go then.

First, if you want to have details about the Flickr API and all exposed methods, go on this page. In our case, we are going to only use methods that do not require authentication (we are not going to fetch data that requires the authorization of the concerned user). The methods are:

  • flickr.people.getPhotos : to get photos published by a given user
  • flickr.favorites.getList : to get favorite photos of a given user
  • flickr.photos.getContactsPublicPhotos : to get recently published photos by contacts of a given user
  • flickr.tags.getListPhoto : to get the list of tags for a given photo
  • flickr.photos.getAllContexts : to get the list of groups (or pools) for a given photo

Note: regarding the photos published by contacts, we are only going to fetch the last 50 photos published by each contact of our user. It would be possible to have a workaround and to get all the pictures of each contact, but this could represent millions of photos (and consequently millions of API calls) and it could potentially deform the results of the analysis since the number of published photos can vary a lot between users in terms of quantity but also in terms of frequency. So let’s focus on a sample of most recent pictures.

To “launch” our flow, I will start with a GenerateFlowFile processor to have only one Flow File (FF) created. Then I will use an UpdateAttribute processor to add attributes to my FF in order to store global properties that will be used along the flow:

  • User ID (or NSID) of the user account we want to analyze
  • API key to use
  • Page (set to 1) that will be used to handle pagination of answers
  • Format (set to json) to get JSON answers from the Flickr API

flickr_properties

Then I use InvokeHTTP processors to perform calls to the FlickrAPI. It will almost be the same for each processor of this kind, so I will only describe one. Let’s describe the processor used to get all favorite pictures of our user.

The only parameter I am going to change is the Remote URL. This parameter is accepting expression language so I can reference attributes of my FF. I set the Remote URL to:

https://api.flickr.com/services/rest/?method=flickr.favorites.getList&api_key=${apiKey}&user_id=${userId}&per_page=500&page=${page}&format=${responseFormat}

As you can see, I give the method of the API I want to request, and also the expected parameters for this method. In this case, the API key, the user ID, the number of elements by page returned by the API (500 is the maximum authorized), the page number I want to request and the returned format.

The result will look like:

jsonFlickrApi({"photos":{"page":1,"pages":10,"perpage":500,"total":"4559","photo":[{"id":"26164116671","owner":"100524190@N04","secret":"3f80e32734","server":"1653","farm":2,"title":"STAIRWAY TO HEAVEN","ispublic":1,"isfriend":0,"isfamily":0,"date_faved":"1459772801"}]},"stat":"ok"})

As you can see, the Flickr API returns a string which is not directly a ready-to-use JSON string. So I add an intermediary step where I remove “jsonFlickrApi(” at the beginning and “)” at the end of the FF content using two ReplaceText processors.

Then I want a system to handle the pagination aspect. For that I create a process group with one input port where I’ll send my FF, and with two output ports (I’ll come back to this very soon).

In this processor group, I first use an EvaluateJsonPath processor to extract “page” and “pages” from the JSON string and get the values as attributes of my FF (you notice that this will update my already existing attribute “page”).

evaluate_page

 

Then I use a RouteOnAttribute processor and I use the Expression language to check if the current page number (attribute “page”) is equal to the total number of pages (attribute “pages”).

page_equal_pages

Whether it is true or not, I route my FF to the first input port that will send my FF to the next step of the flow. But if false, I use an UpdateAttribute processor to increment the attribute “page” to get the new page number I must request:

increment

Note: I also update the “filename” attribute to ensure my FF have different filenames. It is useful when using PutFile processor along the flow for debug purposes.

Then I send this FF in the second output port which goes back to the InvokeHTTP processor but with a new page number. This way, it will loop over all pages to get all expected results.

The processor group looks like:

processor_group

I use the same logic for the photos published by the user and for the photos published by the user’s contacts. At this moment, I can use a funnel to regroup all my results : I am ready to continue at photo level. So far the flow looks like:

flow_first_part

From this point, I use a SplitJson processor to get individual photo information:

splitjson

Then I use an EvaluateJsonPath to extract the photo ID of each photo and to get it as attribute of my FF:

extractPhotoId

I am now able to call the two methods to get tags and groups associated to the picture. I use the following Remote URLs:

Then, I clean once more time the returned string to get a correct JSON string and I call again SplitJson processors to get each tag and group into individual FFs.

Splitting list of tags:

splittags

Splitting list of groups:

splitgroup

Then I use an EvaluateJsonPath to extract tag name and group name as my new FF content:

tagname

At the end, I use a MergeContent processor to concatenate my FFs into bigger FFs with a tag/group name by line. For this processor I use the following parameters:

mergecontent

Note: for the demarcator parameter, I set Shift+Enter for the carriage return.

This way, I have FF of 20k lines except if my incoming queue did not reach this limit in less than 5 minutes.

I end my flow with a PutFile processor to store the results in a given directory.

The second part of the flow looks like:

flow_snd_part

That’s it! I have completed all the flow!

I ran this flow for a given user, it represented about 100k photos with about 450k tag/group names to analyze. To do that, I used Apache Spark and the famous word count example to extract most used keywords.

Once pyspark launched, I used the following commands:

import re
data = sc.textFile("D:/tmp/*")
data.flatMap(lambda line: re.sub('[^0-9a-zA-Z ]+', '', line).lower().split())
    .map(lambda word: (word, 1))
    .reduceByKey(lambda a, b: a+b)
    .map(lambda (x,y): (y,x))
    .sortByKey(False)
    .take(200)

You can notice that I did some cleaning on the characters and changed my strings to lower case. Once I have the 200 most used words, I do some manual cleaning to remove all unwanted things like “flickr”, “photo”, “the”, etc. And I finally get the 20-most used words related to the pictures I got through my flow for my given user:

word_count_result

I hope you enjoyed going through this use case, and I am sure it will give you some ideas to have fun with Apache NiFi ! Please feel free to comment or to ask questions about this post.

Get data from/to Dropbox using Apache NiFi

Few days ago, on the mailing list, a question has been asked regarding the possibility to retrieve data from a smartphone using Apache NiFi. One suggestion was to use a cloud sharing service as an intermediary like Box, DropBox, Google Drive, AWS, etc.

Obviously, it already exists solutions to sync data from these services on your computer. But we could imagine using Apache NiFi to sync data between such services, or even better: to have an account on each of these services and use Apache NiFi as a single endpoint to use and manage all your accounts as a unique cloud storage facility. There are a lot of options out there.

For this post, let’s just use Apache NiFi to get data from/to a Dropbox account. I let you guys use your imagination to develop amazing things!

First, I need to allow my account being accessed by an external application. To do that, I go on this page and I click on the button “Create App”. Then I choose the “Dropbox API” and, in my case, I select the “App folder” type of access (only one directory will be accessible to my application). In the end I choose the name of my application “NIFI”. This will automatically create a folder Applications/NIFI in my Dropbox (this will be seen as the root directory of your Dropbox account from your application point of view: see it as “/”).

Now you need to generate an access token for your application. On the page of your application you have a link to generate the token. Once generated, keep it securely for yourself!

OK! We have everything we need! Just to be sure all is working, you can check using curl that you have the access:

curl https://api.dropbox.com/1/account/info -H "Authorization:Bearer <YOUR-ACCESS-TOKEN>"

Note: here is a post about the new Dropbox API.

Now let’s jump to Apache NiFi to have a flow to automatically download files from the Dropbox folder.

1. List content in Dropbox folder

This can be achieved using :

curl -X POST https://api.dropboxapi.com/2-beta-2/files/list_folder \
    --header "Authorization: Bearer <access-token>" \
    --header "Content-Type: application/json" \
    --data "{\"path\": \"\"}"

With NiFi, we have to use the InvokeHTTP processor. At the time of writing, it is necessary to use an incoming FlowFile to set the content to be sent with a POST request. I will shortly open an issue about that and, hopefully, it should be possible to directly set the body in NiFi 0.7.0.

OK, so to get the content of the request set to

{\"path\": \"\"}

a workaround is to use a local file on your computer and to get it with a GetFile processor.

In conclusion, to issue the request to list the content of the folder we need:

list_files

The GetFile processor has the following properties:

  • regarding the scheduling, I set it CRON-based with “*/5 * * * * ?” to issue the HTTP request every 5 seconds
  • regarding the properties:

get_file.PNG

You notice that I reference a specific file in D:\tmp. This file is named list_files.json and its content is:

{"path": ""}

Also, I configure the processor not to delete the file (Keep Source File = true).

Every 5 seconds, a FlowFile with the expected content will be sent to the InvokeHTTP processor which has the following properties:

list_invokehttp.png

I set the method to POST and set the URL endpoint as specified by Dropbox API. I manually set the Content-Type to “application/json” (the content will be set with the content of the incoming FlowFile). And I manually add a property with the key “Authorization” and the value to “Bearer <Access Token>” (to be changed with your access token). This will add the expected header property.

That is it! At this point we are able to list the content of our Dropbox folder. The output of the InvokeHTTP processor (in response relationship) will have a content looking like:

{"entries": 
[
{".tag": "file", "name": "file_3.txt", "path_lower": "/file_3.txt", "path_display": "/file_3.txt", "id": "id:EoMhAg5eUKAAAAAAAAAAAQ", "client_modified": "2016-03-11T12:35:31Z", "server_modified": "2016-03-11T12:35:31Z", "rev": "145c5a2f7", "size": 13}, 
{".tag": "file", "name": "file_1.txt", "path_lower": "/file_1.txt", "path_display": "/file_1.txt", "id": "id:qNPLoI0buzAAAAAAAAAAAg", "client_modified": "2016-03-11T12:35:32Z", "server_modified": "2016-03-11T12:35:32Z", "rev": "245c5a2f7", "size": 13}, 
{".tag": "file", "name": "file_2.txt", "path_lower": "/file_2.txt", "path_display": "/file_2.txt", "id": "id:ib7z-SMEqYAAAAAAAAAAAQ", "client_modified": "2016-03-11T12:35:33Z", "server_modified": "2016-03-11T12:35:33Z", "rev": "345c5a2f7", "size": 13}
], 
"cursor": "AAG4qAdoMOQVhshkMwFchOjPUnMWuGIvrQkZMb3L1aFa9euMXonxsG0S0_RIfxfFxYMxoz2cKFqc3laWcyDdM5MixrJ3AZ4jAyebx5s70k69z6KrBTE_IUh4Vnd2UZCUIAA", "has_more": false}

We can see the three files I have added in my Dropbox folder.

2. Prepare requests to download files

Let’s have a look at the request we have to send to download a file:

curl -X POST https://api.dropboxapi.com/2-beta-2/files/download \
    --header "Authorization: Bearer <access-token>" \
    --header "Dropbox-API-Arg: {\"path\": \"/cupcake.png\"}"

So, we have to split our FlowFile to have one by file we want to download. Let’s do that with a SplitJson processor with the following properties:

split_json

As we saw, we have our response with a JSON containing an array (named “entries”) with each of the files present in the Dropbox folder. This processor is designed to output a FlowFile by entry of a JSON array located at the given JSON Path. At this point, we will have a FlowFile by file we want to download. Each one will have as content something like:

{".tag": "file", "name": "file_3.txt", "path_lower": "/file_3.txt", "path_display": "/file_3.txt", "id": "id:EoMhAg5eUKAAAAAAAAAAAQ", "client_modified": "2016-03-11T12:35:31Z", "server_modified": "2016-03-11T12:35:31Z", "rev": "145c5a2f7", "size": 13}

Now let’s take advantage of the Expression Language of NiFi: we want to extract some values of this JSON content to add it in some attributes. For that, we use the EvaluateJsonPath processor with the following properties:

evaluate_json_path

In this case I extract the value associated to “path_lower” in the JSON to set it to an attribute with the key “path_lower”. For example, an output FlowFile will have an attribute with the key “path_lower” and the value “/file_3.txt”.

I also override the attribute “filename” (this attribute is already existing since this is a core attribute in NiFi). So far this attribute was containing “list_files.json” from my initial FlowFile at the very beginning of the flow. I change it with the name of the file I want to download (otherwise my files will be downloaded with the name “list_files.json”).

The last thing to do before being able to call a new InvokeHTTP processor is to clean the FlowFile content. This is due to the fact that the Dropbox API is not expecting any content in the request and will return an error if there is.

This is done with a ReplaceText processor with the following properties:

replace_text

Here, I just delete all the content of my FlowFile to only keep the attributes.

3. Download files

We are now ready to configure a last InvokeHTTP processor to download our files.

Note: the Dropbox API is expecting a POST request with no Content-Type otherwise it will return an error. It is kind of a strange behavior but no choice here… It was not possible, with the processor, to have a POST request without Content-Type, so I issued a JIRA and proposed a fix that should be made available in NiFi 0.6.0.

Here are the properties of the processor:

download_invokehttp

I set the method to POST and set the expected endpoint URL. I explicitly set the Content-Type as an empty string. And I manually add two properties for the header properties to add. One for authorization (to change with your access token) and one with the expected JSON (using NiFi Expression Language to get the attribute value of “path_lower” we got earlier) to specify the file to download.

The final step is to add a PutFile processor to put downloaded files into the wanted location. Here are the properties:

putfile

That’s all! We now have a full flow ready to download files from your Dropbox account!

download_flow.PNG

3. Upload files

From this point, it is easy to have the same logic and to upload files based on the needed request:

curl -X POST https://api.dropboxapi.com/2-beta-2/files/upload \
     --header "Authorization: Bearer <access-token>" \
     --header "Content-Type: application/octet-stream" \
     --header "Dropbox-API-Arg: {\"path\": \"/cupcake.png\", \"mode\": \"overwrite\"}" \
     --data-binary @local-file.png

We just need to link a similar GetFile processor (as described before) with a new InvokeHTTP processor with the following properties:

upload_file.png

This way, it will upload the incoming FlowFile with the correct file name at the root of your Dropbox directory (don’t forget to use your own Dropbox access token).

In conclusion, for upload, the flow is as simple as:

upload_flow.PNG

That is it! I think you have everything you need to enjoy NiFi if you want to play with it to handle content on your Dropbox account.

Here is the template of the full upload/download flow (don’t forget to update it with your access token).

Feel free to add comments and ask questions about this post!