Using the SmartSense Activity Explorer for cluster reporting

In the Hortonworks Data Platform, there is SmartSense, a service that analyzes cluster diagnostic data, identifies potential issues, and recommends specific solutions and actions.

SmartSense is made of multiple components and one of the component is the Activity Explorer which is a customized Zeppelin notebook used to access and display the data collected by the Activity Analyzer instances stored in an HBase instance and accessed using Phoenix.

The Activity Explorer gives access to a lot of very useful data when administrating a cluster. For an exhaustive list, have a look to the documentation here.

By default, this Activity Explorer / Zeppelin is configured with the Phoenix interpreter only. The idea of this post is to describe how we can add the JDBC interpreter (or any other interpreter) to allow administration teams using this specific Zeppelin instance as a more general tool for cluster reporting.

One might wonder why I’m not using the Zeppelin service available in the HDP stack. The reason is quite simple: usually, the Zeppelin instances would be deployed on the edge nodes (to be used by the project teams / users of the cluster) while the Activity Explorer would be deployed on an administration node and only accessed by the administrators of the cluster. The idea is to keep Zeppelin instances separated based on the purpose.

First step is to package the JDBC interpreter. Go on the node where you installed the Zeppelin service (where all the interpreters are installed) – not the node where you installed the Activity Explorer component.

cd /usr/hdp/current/zeppelin-server/interpreter/
zip -r jdbc.zip jdbc/

And deploy this ZIP file on the node where is installed the Activity Explorer:

cd /usr/hdp/share/hst/activity-explorer/interpreter/
unzip jdbc.zip

Restart the Activity Explorer component so that the interpreter is available for configuration.

Go to the interpreter configuration page and add a new one, selecting the JDBC type. Configure the interpreter as needed based on your cluster (you can check the configuration you set for this interpreter in the Zeppelin service). In particular, you’ll need:

zeppelin.jdbc.auth.type=KERBEROS
zeppelin.jdbc.principal=<principal of the activity explorer>
zeppelin.jdbc.keytab.location=<keytab of the activity explorer>
hive.proxy.user.property=hive.server2.proxy.user

Note: do not use _HOST in the principal name, use the host FQDN instead.

I also strongly recommend you to configure SSL on the Activity Explorer as well as configuring proper authentication/authorization mechanisms. You can do all that through Ambari as you’d do for the Zeppelin service (have a look at the documentation here).

Since the Activity Explorer account is going to proxy your requests to Hive through the JDBC interpreter, you need to add the proper proxy rules:

hadoop.proxyuser.activity_explorer.groups=<administrator group>
hadoop.proxyuser.activity_explorer.hosts=<activity explorer host>

And you’ll have to restart the appropriate services.

If you stop here and restart the Activity Explorer component, you’ll loose your JDBC interpreter configuration because all of the interpreter configuration of the Activity Explorer is managed by Ambari and reset at each component restart. To prevent the loss of your configuration, you need to copy the content of the file:

/etc/smartsense-activity/conf/interpreter.json

(content of this file has been updated by the Activity Explorer after you added the JDBC interpreter)

And paste this content in Ambari / SmartSense / Advanced / Advanced activity-zeppelin-interpreter. This way, your configuration will remain the same.

Note: keep in mind that all this procedure might have to be done again after a SmartSense upgrade since it’s not the default deployment.

You’re now all set! If you’re wondering what can be done with the JDBC interpreter to enhance the cluster administration tasks… the first thing I can recommend is to create Hive tables on top of the Ranger audits stored in HDFS so that you can create long term reports based on all the cluster audits (if you’re using Solr for the Ranger audits, this data is only stored for a short period of time, default is 90 days). Creating Hive tables on top of the data sitting in HDFS can really be useful if you have compliance/security teams looking for audits reporting.

You could also use the JDBC interpreter to directly access the data in the database backend used for some services like Ambari, Ranger, Hive, etc. It can provide interesting data to build useful reports.

As always, thanks for reading, and feel free to ask questions / leave a comment.

Monitoring NiFi – Ambari & Grafana

Note – This article is part of a series discussing subjects around NiFi monitoring.

When using Apache NiFi (note that version 1.2.0 is now released!) as part of HDF, a lot of of things are simplified using Apache Ambari to deploy NiFi and manage its configuration. Also, using Ambari Metrics service and Grafana, you have a way to easily and visually monitor NiFi performances. And you can also use Apache Ranger to centralize the authorizations management for multiple components (NiFi, Kafka, etc) in one single place.

This article will discuss how you can use Ambari Metrics and Grafana to improve your NiFi monitoring. Let’s start with a quick discussion around AMS (Ambari Metrics System). By default this service is running a Metrics Collector with an embedded HBase instance (and a Zookeeper instance) to store all the metrics, and Ambari will also deploy Metrics Monitor instances on all the nodes of the cluster. The monitors will collect the metrics at system level and send the metrics to the collector. However, the collector also exposes a REST API and that’s what NiFi is going to use with the AmbariReportingTask.

GrafanaBlogOverview

Source and documentation is on the Hortonworks website here.

When using HDF, the Ambari Reporting task should be already up and running for you. If not, you can add it and configure it with a frequency of one minute (it does matter) and use the following parameters:

Screen Shot 2017-05-11 at 9.30.46 AM

Note that “ambari.metrics.collector.url” is an environment variable already set for you when Ambari is starting NiFi. You could also directly give the address, in my case:

http://pvillard-hdf-1:6188/ws/v1/timeline/metrics

Once this reporting task is up and running, you should be able to see the metrics on the NiFi service page in Ambari:

Screen Shot 2017-05-11 at 9.38.57 AM.png

Also, you can go into Grafana to display dashboards with the metrics of your components. You have pre-configured dashboards and here is the one for NiFi:

Screen Shot 2017-05-11 at 9.46.03 AM.png

Now, all the metrics we have here are at cluster level. We are not able to display metrics for specific workflows. With the latest release of Apache NiFi (1.2.0), there is now an optional parameter in the AmbariReportingTask to specify a process group ID. This way, by creating a second reporting task (keep the one providing cluster-level metrics) and by specifying the ID of a specific process group, you can actually create your Grafana dashboards at workflow level.

Let’s say I’ve the following workflow:

Screen Shot 2017-05-11 at 9.52.49 AM

And inside my process group, I have:

Screen Shot 2017-05-11 at 9.52.59 AM.png

Now, my process group having the ID “75973b6e-2d38-1cf3-ffff-fffffdea8cbc”, I can define the following Ambari reporting task:

Screen Shot 2017-05-11 at 9.54.50 AM.png

Note – you must keep “nifi” as the Application ID as it has to match the configuration of the Ambari Metrics System.

Once your reporting task is running, in Grafana, you can create your own dashboard for this workflow and display the metrics you want:

Screen Shot 2017-05-11 at 10.08.59 AM.png

For my Kafka example, here is the dashboard I defined:

Screen Shot 2017-05-11 at 10.39.47 AM.png

In this example, I can see that my workflow is running fine but the free disk space on one of my node is decreasing very quickly. It turns out that when my disk is completely filled, back pressure will be enabled in my workflow and there is no more data sent to Kafka. Instead data is queued in NiFi.

This simple example gives me a lot of information:

  • Everything is default configuration in Ambari and I chose my three NiFi nodes to also host Kafka brokers. By default, for Kafka, the replication factor is set to 1, the number of partitions is set to 1 and the automatic creation of topic is allowed (that’s why I didn’t need to create the topic before starting my workflow). Because of the default parameters, all of the data is sent to only one Kafka broker (pvillard-hdf-2) and that’s why the disk space is quickly decreasing on this node since my three NiFi nodes are sending data to this broker.
  • Also, we clearly see that’s not a good idea to collocate NiFi and Kafka on the same nodes since they are both IO intensive. In this case, they are using the same disk… and we can see that the task duration (for NiFi) is clearly higher on the Kafka node that is receiving the data (pvillard-hdf-2). Long story short: keep NiFi and Kafka on separated nodes (or at the very least with different disks).

With HDF and the Ambari Metrics System, it gives you the ability to create custom relevant dashboards for specific use cases. It also allows you to mix information from Kafka, from NiFi and from the hosts to have all the needed information in one single place.

Also, by using the REST API of the Metrics Collector (you may be interested by this article), you could also send your own data (not only the data gathered at the process group level) to add more information into your dashboards. An example that comes in mind would be to send the lineage duration (see Monitoring of Workflow SLA) at the end of the workflow using an InvokeHTTP processor and sending a JSON payload using a POST request to the API endpoint.

Let’s say I want to monitor how long it takes between my GenerateFlowFile and the end of my workflow to check if some particular events are taking longer. Then I could have something like:

Screen Shot 2017-05-11 at 5.58.02 PM.png

What am I doing here? I want to send to AMS the information about the lineage duration of the flow files I sent into my Kafka topic. However I don’t want to send the duration of every single event (that’s not really useful and it’s going to generate a lot of requests/data). Instead I want to make an API call only once per minute. The idea is to compute the mean and max of the lineage duration with a rolling window of one minute and to only send this value to AMS.

I could use the new AttributeRollingWindow processor but it is not as fast as the PublishKafka and I don’t want to generate back pressure in my relationships. So I use the InvokeScriptedProcessor to build my own rolling processor (it’s faster because I am not using any state information):

  • this processor takes a frequency duration as a parameter (that I’ll set to 1 minute in this example)
  • for every flow file coming in, it will extract the lineage start date to compute max and mean lineage duration over the rolling window. If the last flow file sent in the success relationship was less than one minute ago, I’ll route the flow file to drop relationship (that I set to auto-terminated). If it was more than one minute ago, I update the attributes of the current flow file with the mean and max of all the flow files since the last “success” flow file and route this flow file in the success relationship

Since I’ve flow files coming in my processor at a high rate, I know that my processor will release one flow file every minute with the mean and max of the linage duration for the flow files of the last minute.

Then I use a ReplaceText processor to construct the JSON payload that I’ll send to the Metrics Collector using the InvokeHttp processor.

Here is the configuration of the InvokeScriptedProcessor:

Screen Shot 2017-05-11 at 7.54.00 PM

The Groovy script used can be found here.

Then I create the JSON payload with the ReplaceText processor:

Screen Shot 2017-05-11 at 7.56.33 PM.png

Note that I use the ID of the processor as the “instanceid” attribute in the JSON.

Then, I use the InvokeHttp processor (with a scheduling/frequency of 1 minute):

Screen Shot 2017-05-11 at 7.57.45 PM.png

Now, I can use this information to build the corresponding graph in my Grafana dashboard:

Screen Shot 2017-05-11 at 7.59.27 PM

I can see that, in average, it takes about 150 milliseconds to generate my flow file, publish it in my Kafka topic and get it into my scripted processor. I could also generate one metric per host of my cluster to check if a node is performing badly compared to the others.

Now you can easily send your custom data into AMS and create dashboards for your specific use cases and workflows.

As usual feel free to ask questions and comment this post.

Monitoring NiFi – Introduction

Apache NiFi 1.2.0 has just been released with a lot of very cool new features… and I take this opportunity to start a series of articles around monitoring. This is a recurring subject and I often hear the same questions. This series won’t provide an exhaustive list of the ways you can use to monitor NiFi (with or without HDF) but, at least, it should get you started!

Here is a quick summary of the subjects that will be covered:

For this series of article, I will use, as a demo environment, a 4-nodes HDF cluster (running on CentOS 7):

I’m using HDF to take advantage of Ambari to ease the deployment but this is not mandatory for what I’m going to discuss in the articles (except for stuff around the Ambari reporting task obviously).

I will not cover how setting up this environment but if this is something you are looking after, feel free to ask questions (here or on the Hortonworks Community Connection) and to have a look into Hortonworks documentation about HDF.

I don’t want to write a single (very) long article and for the sake of clarity there is one article per listed subject. Also, I’ll try to update the articles to stick as best as possible to latest features provided by NiFi over time.

Also, if you feel that some subjects should be added to the list, let me know and I’ll do my best to cover other monitoring-related questions.

Apache NiFi 1.0.0 – Cluster setup

As you may know a version 1.0.0-BETA of Apache NiFi has been released few days ago. The upcoming 1.0.0 release will be a great moment for the community as it it will mark a lot of work over the last few months with many new features being added.

The objective of the Beta release is to give people a chance to try this new version and to give a feedback before the official major release which will come shortly. If you want to preview this new version with a completely new look, you can download the binaries here, unzip it, and run it (‘./bin/nifi.sh start‘ or ‘./bin/run-nifi.bat‘ for Windows), then you just have to access http://localhost:8080/nifi/.

The objective of this post is to briefly explain how to setup an unsecured NiFi cluster with this new version (a post for setting up a secured cluster will come shortly with explanations on how to use a new tool that will be shipped with NiFi to ease the installation of a secured cluster).

One really important change with this new version is the new paradigm around cluster installation. From the NiFi documentation, we can read:

Starting with the NiFi 1.0 release, NiFi employs a Zero-Master Clustering paradigm. Each of the nodes in a NiFi cluster performs the same tasks on the data but each operates on a different set of data. Apache ZooKeeper elects one of the nodes as the Cluster Coordinator, and failover is handled automatically by ZooKeeper. All cluster nodes report heartbeat and status information to the Cluster Coordinator. The Cluster Coordinator is responsible for disconnecting and connecting nodes. As a DataFlow manager, you can interact with the NiFi cluster through the UI of any node in the cluster. Any change you make is replicated to all nodes in the cluster, allowing for multiple entry points to the cluster.

zero-master-cluster

OK, let’s start with the installation. As you may know it is greatly recommended to use an odd number of ZooKeeper instances with at least 3 nodes (to maintain a majority also called quorum). NiFi comes with an embedded instance of ZooKeeper, but you are free to use an existing cluster of ZooKeeper instances if you want. In this article, we will use the embedded ZooKeeper option.

I will use my computer as the first instance. I also launched two virtual machines (with a minimal Centos 7). All my 3 instances are able to communicate to each other on requested ports. On each machine, I configure my /etc/hosts file with:

192.168.1.17 node-3
192.168.56.101 node-2
192.168.56.102 node-1

I deploy the binaries file on my three instances and unzip it. I now have a NiFi directory on each one of my nodes.

The first thing is to configure the list of the ZK (ZooKeeper) instances in the configuration file ‘./conf/zookeeper.properties‘. Since our three NiFi instances will run the embedded ZK instance, I just have to complete the file with the following properties:

server.1=node-1:2888:3888
server.2=node-2:2888:3888
server.3=node-3:2888:3888

Then, everything happens in the ‘./conf/nifi.properties‘. First, I specify that NiFi must run an embedded ZK instance, with the following property:

nifi.state.management.embedded.zookeeper.start=true

I also specify the ZK connect string:

nifi.zookeeper.connect.string=node-1:2181,node-2:2181,node-3:2181

As you can notice, the ./conf/zookeeper.properties file has a property named dataDir. By default, this value is set to ./state/zookeeper. If more than one NiFi node is running an embedded ZK, it is important to tell the server which one it is.

To do that, you need to create a file name myid and placing it in ZK’s data directory. The content of this file should be the index of the server as previously specify by the server. property.

On node-1, I’ll do:

mkdir ./state
mkdir ./state/zookeeper
echo 1 > ./state/zookeeper/myid

The same operation needs to be done on each node (don’t forget to change the ID).

If you don’t do this, you may see the following kind of exceptions in the logs:

Caused by: java.lang.IllegalArgumentException: ./state/zookeeper/myid file is missing

Then we go to clustering properties. For this article, we are setting up an unsecured cluster, so we must keep:

nifi.cluster.protocol.is.secure=false

Then, we have the following properties:

nifi.cluster.is.node=true
nifi.cluster.node.address=node-1
nifi.cluster.node.protocol.port=9999
nifi.cluster.node.protocol.threads=10
nifi.cluster.node.event.history.size=25
nifi.cluster.node.connection.timeout=5 sec
nifi.cluster.node.read.timeout=5 sec
nifi.cluster.firewall.file=

I set the FQDN of the node I am configuring, and I choose the arbitrary 9999 port for the communication with the elected cluster coordinator. I apply the same configuration on my other nodes:

nifi.cluster.is.node=true
nifi.cluster.node.address=node-2
nifi.cluster.node.protocol.port=9999
nifi.cluster.node.protocol.threads=10
nifi.cluster.node.event.history.size=25
nifi.cluster.node.connection.timeout=5 sec
nifi.cluster.node.read.timeout=5 sec
nifi.cluster.firewall.file=

and

nifi.cluster.is.node=true
nifi.cluster.node.address=node-3
nifi.cluster.node.protocol.port=9999
nifi.cluster.node.protocol.threads=10
nifi.cluster.node.event.history.size=25
nifi.cluster.node.connection.timeout=5 sec
nifi.cluster.node.read.timeout=5 sec
nifi.cluster.firewall.file=

We have configured the exchanges between the nodes and the cluster coordinator, now let’s move to the exchanges between the nodes (to balance the data of the flows). We have the following properties:

nifi.remote.input.host=node-1
nifi.remote.input.secure=false
nifi.remote.input.socket.port=9998
nifi.remote.input.http.enabled=true
nifi.remote.input.http.transaction.ttl=30 sec

Again, I set the FQDN of the node I am configuring and I choose the arbitrary 9998 port for the Site-to-Site (S2S) exchanges between the nodes of my cluster. The same applies for all the nodes (just change the host property with the correct FQDN).

It is also important to set the FQDN for the web server property, otherwise we may get strange behaviors with all nodes identified as ‘localhost’ in the UI. Consequently, for each node, set the following property with the correct FQDN:

nifi.web.http.host=node-1

And that’s all! Easy, isn’t it?

OK, let’s start our nodes and let’s tail the logs to see what’s going on there!

./bin/nifi.sh start && tail -f ./logs/nifi-app.log

If you look at the logs, you should see that one of the node gets elected as the cluster coordinator and then you should see heartbeats created by the three nodes and sent to the cluster coordinator every 5 seconds.

You can connect to the UI using the node you want (you can have multiple users connected to different nodes, modifications will be applied on each node). Let’s go to:

http://node-2:8080/nifi

Here is what it looks like:

Screen Shot 2016-08-13 at 7.33.08 PM

As you can see in the top-left corner, there are 3 nodes in our cluster. Besides, if we go in the menu (button in the top-right corner) and select the cluster page, we have details on our three nodes:

Screen Shot 2016-08-13 at 7.35.28 PM

We see that my node-2 has been elected as cluster coordinator, and that my node-3 is my primary node. This distinction is important because some processors must run on a unique node (for data consistency) and in this case we will want it to run “On primary node” (example below).

We can display details on a specific node (“information” icon on the left):

Screen Shot 2016-08-13 at 7.35.48 PM

OK, let’s add a processor like GetTwitter. Since the flow will run on all nodes (with balanced data between the nodes), this processor must run on a unique processor if we don’t want to duplicate data. Then, in the scheduling strategy, we will choose the strategy “On primary node”. This way, we don’t duplicate data, and if the primary node changes (because my node dies or gets disconnected), we won’t loose data, the workflow will still be executed.

Screen Shot 2016-08-13 at 7.45.19 PM

Then I can connect my processor to a PutFile processor to save the tweets in JSON by setting a local directory (/tmp/twitter):

Screen Shot 2016-08-13 at 7.52.25 PM

If I run this flow, all my JSON tweets will be stored on the primary node, the data won’t be balanced. To balance the data, I need to use a RPG (Remote Process Group), the RPG will connect to the coordinator to evaluate the load of each node and balance the data over the nodes. It gives us the following flow:

Screen Shot 2016-08-13 at 8.00.26 PM

I have added an input port called “RPG”, then I have added a Remote Process Group that I connected to ” http://node-2:8080/nifi ” and I enabled transmission so that the Remote Process Group was aware of the existing input ports on my cluster. Then in the Remote Process Group configuration, I enabled the RPG input port. I then connected my GetTwitter to the Remote Process Group and selected the RPG input port. Finally, I connected my RPG input port to my PutFile processor.

When running the flow, I now have balanced data all over my nodes (I can check in the local directory ‘/tmp/twitter‘ on each node).

That’s all for this post. I hope you enjoyed it and that it will be helpful for you if setting up a NiFi cluster. All comments/remarks are very welcomed and I kindly encourage you to download Apache NiFi, to try it and to give a feedback to the community if you have any.

US presidential election via Twitter using Apache NiFi, Spark, Hive and Zeppelin

This article describes a frequency and sentiment analysis based on real-time tweets streams in relation to the four main candidates in the US Presidential Election.

The main objective was to deploy and to test the available connector between Apache NiFi and Apache Spark, so I decided to implement the following use case:

At the end, I get real time analytics such as:

  • frequency of tweets along the time per candidate
  • percentage of negative, positive and neutral tweets per candidate
  • opinion trends along the time for each candidate

The article is available on Hortonworks Community Connection website. And as always, please feel free to comment and/or ask questions.

OAuth 1.0A with Apache NiFi (Twitter API example)

A lot of API are using OAuth  protocol to authorize the received requests and to check if everything is OK regarding the identity of the request sender.

OAuth is an open standard for authorization, commonly used as a way for Internet users to log into third party websites using their Microsoft, Google, Facebook, Twitter, One Network etc. accounts without exposing their password. Generally, OAuth provides to clients a “secure delegated access” to server resources on behalf of a resource owner. It specifies a process for resource owners to authorize third-party access to their server resources without sharing their credentials. Designed specifically to work with Hypertext Transfer Protocol (HTTP), OAuth essentially allows access tokens to be issued to third-party clients by an authorization server, with the approval of the resource owner. The third party then uses the access token to access the protected resources hosted by the resource server.

As a remark, there are two versions of the protocol currently used out there: 1.0A and 2.0. As far as I know, 1.0A is the most commonly used. I already faced the need to use OAuth 1.0A protocol with the Flickr API but, back then, I found a way to get my data differently.

Recently, a question was asked on the Hortonworks Community Connection regarding the use of Apache NiFi to get data from Twitter API using OAuth 1.0A protocol. So this time, I decided to have a look on this and to get the job done.

This post presents the flow I used to perform a request against Twitter API using OAuth protocol. It gives me the opportunity to use for the first time the ExecuteScript processor which allows user to execute custom scripts on the fly inside NiFi (you will find a lot of examples on this great site).

Note 1: this was the first time I used Groovy language, be nice with me!

Note 2: I didn’t test the flow on a lot of methods. Some modifications may be necessary for some cases.

OK. The objective was to request the “users/lookup” method of the Twitter API. You can read the documentation here.

I want to perform a HTTP GET on:

https://api.twitter.com/1.1/users/lookup.json?screen_name=twitterapi,twitter

So far it seems really easy to do with a simple InvokeHTTP processor. The thing is you need to identify yourself when sending the request. Here comes the OAuth protocol. The official specification for 1.0A can be read here. But in the case of the Twitter API, you have a nice documentation here.

Besides on the documentation of each method, you have an OAuth Signature Generator that can be accessed (if you have defined a Twitter App). The generator is here. It lets you play around and gives great insights on each request to debug your own implementation of OAuth protocol.

The global idea is: you have a lot of input parameters and you must follow the specifications to construct a string based on the parameters. This string will be the value associated to “Authorization” key in HTTP header properties.

Here is the list of the needed parameters. First the parameters directly linked to your request:

Then the global parameters related to OAuth:

  • Consumer key (private information of your app provided by Twitter)
  • Consumer secret (private information of your app provided by Twitter)
  • Nonce (random string, uniquely generated for each request)
  • Signature method (with Twitter it is HMAC-SHA1)
  • Timestamp (in seconds)
  • Token (private information of your app provided by Twitter)
  • Token secret (private information of your app provided by Twitter)
  • Version (in this case 1.0)

The first step is to construct the “signature base string“. For that you first need to create the “parameter string“. All is very well explained here. Once you have the signature base string, you can encode it using HMAC-SHA1 and you easily get the header property to set in your HTTP request:

Authorization: OAuth oauth_consumer_key="*******", oauth_nonce="a9ab2392e5158a4c4e029c7829164571", oauth_signature="4s4Hi5hQ%2FoLKprW7qsRlImds3Og%3D", oauth_signature_method="HMAC-SHA1", oauth_timestamp="1460453975", oauth_token="*******", oauth_version="1.0"

Let’s get into the details using Apache NiFi. Here is the flow I created:

oauthFlow

I use a GenerateFlowFile to generate an empty Flow File (FF) in order to execute my flow. Then I use an UpdateAttribute processor to add attributes to my FF. In this case, I only add the parameters related to the specific request I want to execute:

globalParam

Then I send my FF into a process group that will compute the header property to set (I will come back to this part later). Then I perform my request using the InvokeHTTP processor:

invokeHTTP

I set the method to GET, the URL to my corresponding FF attribute, the content type to text/plain and I add a custom property named Authorization with the FF attribute I created in my process group (see below). This custom property will be added as a HTTP header in the request. At the end, I use a PutFile processor to write the result of my request in a local directory.

Let’s go to the interesting part of our flow where all the magic is: the process group I named OAuth 1.0A. Here it is:

processGroup

I just use two processors. The first one is an UpdateAttribute to add all the parameters I need as attributes of my FF. the second one is an ExecuteScript processor where I put my groovy code to compute the header property.

First… the parameters:

oauthParameters

Note: I use Expression Language provided by NiFi for some attributes.

  • arguments is used to extract the argument part in my target URL. In this example: screen_name=twitterapi,twitter
  • base_url is the URL I request without any argument. In this example: https://api.twitter.com/1.1/users/lookup.json
  • For the nonce parameter I use the “UUID” method of the expression language which generated a random string and I just take to replace the ‘-‘ characters to only keep an alphanumeric string.
  • For the timestamp, I use the “now” method of the expression language and I use substring to only keep seconds.

Let’s move to the ExecuteScript part. I set the script engine to Groovy and I put my script code in the “script body” property. The full code is available at the end of the post. Let’s go through it piece by piece.

First thing, I want to trigger my code only when a FF is available:

def flowFile = session.get()
if (!flowFile) return

Then I define a method I will use for the HMAC-SHA1 encoding:

def static hmac(String data, String key) throws java.security.SignatureException
{
    String result
    try {
        // get an hmac_sha1 key from the raw key bytes
        SecretKeySpec signingKey = new SecretKeySpec(key.getBytes(), "HmacSHA1");
        // get an hmac_sha1 Mac instance and initialize with the signing key
        Mac mac = Mac.getInstance("HmacSHA1");
        mac.init(signingKey);
        // compute the hmac on input data bytes
        byte[] rawHmac = mac.doFinal(data.getBytes());
        result= rawHmac.encodeBase64()
    } catch (Exception e) {
        throw new SignatureException("Failed to generate HMAC : " + e.getMessage());
    }
    return result
}

For this part, I will need to add some imports at the beginning of my script body:

import java.security.SignatureException
import javax.crypto.Mac
import javax.crypto.spec.SecretKeySpec

Then I retrieve all the attributes of my FF and I extract some attributes I don’t need to construct my parameter string:

def attributes = flowFile.getAttributes()
// retrieve arguments of the target and split arguments
def arguments = attributes.arguments.tokenize('&')
def method = attributes.method
def base_url = attributes.base_url
def consumerSecret = attributes.oauth_consumer_secret
def tokenSecret = attributes.oauth_token_secret

Then I create a TreeMap in which I add all the parameters I need to construct my parameter string. A TreeMap ensures me that it is sorted on keys in alphabetical order.

TreeMap map = [:]

for (String item : arguments) {
        def (key, value) = item.tokenize('=')
        map.put(key, value)
}

map.put("oauth_consumer_key", attributes.oauth_consumer_key)
map.put("oauth_nonce", attributes.oauth_nonce)
map.put("oauth_signature_method", attributes.oauth_signature_method)
map.put("oauth_timestamp", attributes.oauth_timestamp)
map.put("oauth_token", attributes.oauth_token)
map.put("oauth_version", attributes.oauth_version)

Then I add a method to the String class to allow percent encoding on String objects:

String.metaClass.encode = {
    java.net.URLEncoder.encode(delegate, "UTF-8").replace("+", "%20").replace("*", "%2A").replace("%7E", "~");
}

I am now able to construct the parameter string:

String parameterString = ""

map.each { key, value ->
    parameterString += key.encode()
    parameterString += '='
    parameterString += value.encode()
    parameterString += '&'
}

parameterString = parameterString.substring(0, parameterString.length()-1);

Update: the code above can be simplified as below (see Andy’s comment)

String parameterString = map.collect { String key, String value ->
    "${key.encode()}=${value.encode()}"
}.join("&")

It is now possible to get the signature:

String signatureBaseString = ""
signatureBaseString += method.toUpperCase()
signatureBaseString += '&'
signatureBaseString += base_url.encode()
signatureBaseString += '&'
signatureBaseString += parameterString.encode()

String signingKey = consumerSecret.encode() + '&' + tokenSecret.encode()
String oauthSignature = hmac(signatureBaseString, signingKey)

I may add this information as a new attribute of my FF:

flowFile = session.putAttribute(flowFile, 'oauth_signature', oauthSignature)

Then I can construct the header property value to associate to Authorization key:

String oauth = 'OAuth '
oauth += 'oauth_consumer_key="'
oauth += attributes.oauth_consumer_key.encode()
oauth += '", '
oauth += 'oauth_nonce="'
oauth += attributes.oauth_nonce.encode()
oauth += '", '
oauth += 'oauth_signature="'
oauth += oauthSignature.encode()
oauth += '", '
oauth += 'oauth_signature_method="'
oauth += attributes.oauth_signature_method.encode()
oauth += '", '
oauth += 'oauth_timestamp="'
oauth += attributes.oauth_timestamp.encode()
oauth += '", '
oauth += 'oauth_token="'
oauth += attributes.oauth_token.encode()
oauth += '", '
oauth += 'oauth_version="'
oauth += attributes.oauth_version.encode()
oauth += '"'

I add this information as an attribute (that will be used in the InvokeHTTP processor as we saw before) and I forward my FF to the success relationship:

flowFile = session.putAttribute(flowFile, 'oauth_header', oauth)
session.transfer(flowFile, REL_SUCCESS)

That’s it: I have an operational flow implementing OAuth 1.0A protocol to request against the Twitter API.

The full code is available here as a gist.
The NiFi template is available here.

As always, feel free to ask questions and comment this post!

 

Analyze Flickr user interests using Apache NiFi and Spark

Let’s have some fun with Apache NiFi by studying a new use case: analyze a Flickr account to get some information regarding what kind of pictures the owner likes.

Before getting started, have a look on my previous posts if it is the first time you hear about NiFi. Also let me remind you that Flickr is one website where people can share their pictures. In addition, users can add pictures of others users as “favorites” (similar to a like on Facebook) and add other users as “contacts” to get all their recently published photos on their personal feed.

OK, so now I assume you know a little about Flickr. The nice thing is that there is an API exposed by this website to exchange information with it. To use it you need to request an API key. Once done, you’ll have a key and secret passphrase. You will need this information to request Flickr through Apache NiFi.

Let’s move to the global idea of what we are going to do: for a given account user we are going to get all the photos published by this user, all the photos liked (favorites) by this user, and most recent photos published by users followed by our guy. Next we will get all tags and groups associated to each photo. We will aggregate all this data and perform a word count to get an idea of what our initial user is looking for when using Flickr. Does it sound interesting? OK, let’s go then.

First, if you want to have details about the Flickr API and all exposed methods, go on this page. In our case, we are going to only use methods that do not require authentication (we are not going to fetch data that requires the authorization of the concerned user). The methods are:

  • flickr.people.getPhotos : to get photos published by a given user
  • flickr.favorites.getList : to get favorite photos of a given user
  • flickr.photos.getContactsPublicPhotos : to get recently published photos by contacts of a given user
  • flickr.tags.getListPhoto : to get the list of tags for a given photo
  • flickr.photos.getAllContexts : to get the list of groups (or pools) for a given photo

Note: regarding the photos published by contacts, we are only going to fetch the last 50 photos published by each contact of our user. It would be possible to have a workaround and to get all the pictures of each contact, but this could represent millions of photos (and consequently millions of API calls) and it could potentially deform the results of the analysis since the number of published photos can vary a lot between users in terms of quantity but also in terms of frequency. So let’s focus on a sample of most recent pictures.

To “launch” our flow, I will start with a GenerateFlowFile processor to have only one Flow File (FF) created. Then I will use an UpdateAttribute processor to add attributes to my FF in order to store global properties that will be used along the flow:

  • User ID (or NSID) of the user account we want to analyze
  • API key to use
  • Page (set to 1) that will be used to handle pagination of answers
  • Format (set to json) to get JSON answers from the Flickr API

flickr_properties

Then I use InvokeHTTP processors to perform calls to the FlickrAPI. It will almost be the same for each processor of this kind, so I will only describe one. Let’s describe the processor used to get all favorite pictures of our user.

The only parameter I am going to change is the Remote URL. This parameter is accepting expression language so I can reference attributes of my FF. I set the Remote URL to:

https://api.flickr.com/services/rest/?method=flickr.favorites.getList&api_key=${apiKey}&user_id=${userId}&per_page=500&page=${page}&format=${responseFormat}

As you can see, I give the method of the API I want to request, and also the expected parameters for this method. In this case, the API key, the user ID, the number of elements by page returned by the API (500 is the maximum authorized), the page number I want to request and the returned format.

The result will look like:

jsonFlickrApi({"photos":{"page":1,"pages":10,"perpage":500,"total":"4559","photo":[{"id":"26164116671","owner":"100524190@N04","secret":"3f80e32734","server":"1653","farm":2,"title":"STAIRWAY TO HEAVEN","ispublic":1,"isfriend":0,"isfamily":0,"date_faved":"1459772801"}]},"stat":"ok"})

As you can see, the Flickr API returns a string which is not directly a ready-to-use JSON string. So I add an intermediary step where I remove “jsonFlickrApi(” at the beginning and “)” at the end of the FF content using two ReplaceText processors.

Then I want a system to handle the pagination aspect. For that I create a process group with one input port where I’ll send my FF, and with two output ports (I’ll come back to this very soon).

In this processor group, I first use an EvaluateJsonPath processor to extract “page” and “pages” from the JSON string and get the values as attributes of my FF (you notice that this will update my already existing attribute “page”).

evaluate_page

 

Then I use a RouteOnAttribute processor and I use the Expression language to check if the current page number (attribute “page”) is equal to the total number of pages (attribute “pages”).

page_equal_pages

Whether it is true or not, I route my FF to the first input port that will send my FF to the next step of the flow. But if false, I use an UpdateAttribute processor to increment the attribute “page” to get the new page number I must request:

increment

Note: I also update the “filename” attribute to ensure my FF have different filenames. It is useful when using PutFile processor along the flow for debug purposes.

Then I send this FF in the second output port which goes back to the InvokeHTTP processor but with a new page number. This way, it will loop over all pages to get all expected results.

The processor group looks like:

processor_group

I use the same logic for the photos published by the user and for the photos published by the user’s contacts. At this moment, I can use a funnel to regroup all my results : I am ready to continue at photo level. So far the flow looks like:

flow_first_part

From this point, I use a SplitJson processor to get individual photo information:

splitjson

Then I use an EvaluateJsonPath to extract the photo ID of each photo and to get it as attribute of my FF:

extractPhotoId

I am now able to call the two methods to get tags and groups associated to the picture. I use the following Remote URLs:

Then, I clean once more time the returned string to get a correct JSON string and I call again SplitJson processors to get each tag and group into individual FFs.

Splitting list of tags:

splittags

Splitting list of groups:

splitgroup

Then I use an EvaluateJsonPath to extract tag name and group name as my new FF content:

tagname

At the end, I use a MergeContent processor to concatenate my FFs into bigger FFs with a tag/group name by line. For this processor I use the following parameters:

mergecontent

Note: for the demarcator parameter, I set Shift+Enter for the carriage return.

This way, I have FF of 20k lines except if my incoming queue did not reach this limit in less than 5 minutes.

I end my flow with a PutFile processor to store the results in a given directory.

The second part of the flow looks like:

flow_snd_part

That’s it! I have completed all the flow!

I ran this flow for a given user, it represented about 100k photos with about 450k tag/group names to analyze. To do that, I used Apache Spark and the famous word count example to extract most used keywords.

Once pyspark launched, I used the following commands:

import re
data = sc.textFile("D:/tmp/*")
data.flatMap(lambda line: re.sub('[^0-9a-zA-Z ]+', '', line).lower().split())
    .map(lambda word: (word, 1))
    .reduceByKey(lambda a, b: a+b)
    .map(lambda (x,y): (y,x))
    .sortByKey(False)
    .take(200)

You can notice that I did some cleaning on the characters and changed my strings to lower case. Once I have the 200 most used words, I do some manual cleaning to remove all unwanted things like “flickr”, “photo”, “the”, etc. And I finally get the 20-most used words related to the pictures I got through my flow for my given user:

word_count_result

I hope you enjoyed going through this use case, and I am sure it will give you some ideas to have fun with Apache NiFi ! Please feel free to comment or to ask questions about this post.

Get data from/to Dropbox using Apache NiFi

Few days ago, on the mailing list, a question has been asked regarding the possibility to retrieve data from a smartphone using Apache NiFi. One suggestion was to use a cloud sharing service as an intermediary like Box, DropBox, Google Drive, AWS, etc.

Obviously, it already exists solutions to sync data from these services on your computer. But we could imagine using Apache NiFi to sync data between such services, or even better: to have an account on each of these services and use Apache NiFi as a single endpoint to use and manage all your accounts as a unique cloud storage facility. There are a lot of options out there.

For this post, let’s just use Apache NiFi to get data from/to a Dropbox account. I let you guys use your imagination to develop amazing things!

First, I need to allow my account being accessed by an external application. To do that, I go on this page and I click on the button “Create App”. Then I choose the “Dropbox API” and, in my case, I select the “App folder” type of access (only one directory will be accessible to my application). In the end I choose the name of my application “NIFI”. This will automatically create a folder Applications/NIFI in my Dropbox (this will be seen as the root directory of your Dropbox account from your application point of view: see it as “/”).

Now you need to generate an access token for your application. On the page of your application you have a link to generate the token. Once generated, keep it securely for yourself!

OK! We have everything we need! Just to be sure all is working, you can check using curl that you have the access:

curl https://api.dropbox.com/1/account/info -H "Authorization:Bearer <YOUR-ACCESS-TOKEN>"

Note: here is a post about the new Dropbox API.

Now let’s jump to Apache NiFi to have a flow to automatically download files from the Dropbox folder.

1. List content in Dropbox folder

This can be achieved using :

curl -X POST https://api.dropboxapi.com/2-beta-2/files/list_folder \
    --header "Authorization: Bearer <access-token>" \
    --header "Content-Type: application/json" \
    --data "{\"path\": \"\"}"

With NiFi, we have to use the InvokeHTTP processor. At the time of writing, it is necessary to use an incoming FlowFile to set the content to be sent with a POST request. I will shortly open an issue about that and, hopefully, it should be possible to directly set the body in NiFi 0.7.0.

OK, so to get the content of the request set to

{\"path\": \"\"}

a workaround is to use a local file on your computer and to get it with a GetFile processor.

In conclusion, to issue the request to list the content of the folder we need:

list_files

The GetFile processor has the following properties:

  • regarding the scheduling, I set it CRON-based with “*/5 * * * * ?” to issue the HTTP request every 5 seconds
  • regarding the properties:

get_file.PNG

You notice that I reference a specific file in D:\tmp. This file is named list_files.json and its content is:

{"path": ""}

Also, I configure the processor not to delete the file (Keep Source File = true).

Every 5 seconds, a FlowFile with the expected content will be sent to the InvokeHTTP processor which has the following properties:

list_invokehttp.png

I set the method to POST and set the URL endpoint as specified by Dropbox API. I manually set the Content-Type to “application/json” (the content will be set with the content of the incoming FlowFile). And I manually add a property with the key “Authorization” and the value to “Bearer <Access Token>” (to be changed with your access token). This will add the expected header property.

That is it! At this point we are able to list the content of our Dropbox folder. The output of the InvokeHTTP processor (in response relationship) will have a content looking like:

{"entries": 
[
{".tag": "file", "name": "file_3.txt", "path_lower": "/file_3.txt", "path_display": "/file_3.txt", "id": "id:EoMhAg5eUKAAAAAAAAAAAQ", "client_modified": "2016-03-11T12:35:31Z", "server_modified": "2016-03-11T12:35:31Z", "rev": "145c5a2f7", "size": 13}, 
{".tag": "file", "name": "file_1.txt", "path_lower": "/file_1.txt", "path_display": "/file_1.txt", "id": "id:qNPLoI0buzAAAAAAAAAAAg", "client_modified": "2016-03-11T12:35:32Z", "server_modified": "2016-03-11T12:35:32Z", "rev": "245c5a2f7", "size": 13}, 
{".tag": "file", "name": "file_2.txt", "path_lower": "/file_2.txt", "path_display": "/file_2.txt", "id": "id:ib7z-SMEqYAAAAAAAAAAAQ", "client_modified": "2016-03-11T12:35:33Z", "server_modified": "2016-03-11T12:35:33Z", "rev": "345c5a2f7", "size": 13}
], 
"cursor": "AAG4qAdoMOQVhshkMwFchOjPUnMWuGIvrQkZMb3L1aFa9euMXonxsG0S0_RIfxfFxYMxoz2cKFqc3laWcyDdM5MixrJ3AZ4jAyebx5s70k69z6KrBTE_IUh4Vnd2UZCUIAA", "has_more": false}

We can see the three files I have added in my Dropbox folder.

2. Prepare requests to download files

Let’s have a look at the request we have to send to download a file:

curl -X POST https://api.dropboxapi.com/2-beta-2/files/download \
    --header "Authorization: Bearer <access-token>" \
    --header "Dropbox-API-Arg: {\"path\": \"/cupcake.png\"}"

So, we have to split our FlowFile to have one by file we want to download. Let’s do that with a SplitJson processor with the following properties:

split_json

As we saw, we have our response with a JSON containing an array (named “entries”) with each of the files present in the Dropbox folder. This processor is designed to output a FlowFile by entry of a JSON array located at the given JSON Path. At this point, we will have a FlowFile by file we want to download. Each one will have as content something like:

{".tag": "file", "name": "file_3.txt", "path_lower": "/file_3.txt", "path_display": "/file_3.txt", "id": "id:EoMhAg5eUKAAAAAAAAAAAQ", "client_modified": "2016-03-11T12:35:31Z", "server_modified": "2016-03-11T12:35:31Z", "rev": "145c5a2f7", "size": 13}

Now let’s take advantage of the Expression Language of NiFi: we want to extract some values of this JSON content to add it in some attributes. For that, we use the EvaluateJsonPath processor with the following properties:

evaluate_json_path

In this case I extract the value associated to “path_lower” in the JSON to set it to an attribute with the key “path_lower”. For example, an output FlowFile will have an attribute with the key “path_lower” and the value “/file_3.txt”.

I also override the attribute “filename” (this attribute is already existing since this is a core attribute in NiFi). So far this attribute was containing “list_files.json” from my initial FlowFile at the very beginning of the flow. I change it with the name of the file I want to download (otherwise my files will be downloaded with the name “list_files.json”).

The last thing to do before being able to call a new InvokeHTTP processor is to clean the FlowFile content. This is due to the fact that the Dropbox API is not expecting any content in the request and will return an error if there is.

This is done with a ReplaceText processor with the following properties:

replace_text

Here, I just delete all the content of my FlowFile to only keep the attributes.

3. Download files

We are now ready to configure a last InvokeHTTP processor to download our files.

Note: the Dropbox API is expecting a POST request with no Content-Type otherwise it will return an error. It is kind of a strange behavior but no choice here… It was not possible, with the processor, to have a POST request without Content-Type, so I issued a JIRA and proposed a fix that should be made available in NiFi 0.6.0.

Here are the properties of the processor:

download_invokehttp

I set the method to POST and set the expected endpoint URL. I explicitly set the Content-Type as an empty string. And I manually add two properties for the header properties to add. One for authorization (to change with your access token) and one with the expected JSON (using NiFi Expression Language to get the attribute value of “path_lower” we got earlier) to specify the file to download.

The final step is to add a PutFile processor to put downloaded files into the wanted location. Here are the properties:

putfile

That’s all! We now have a full flow ready to download files from your Dropbox account!

download_flow.PNG

3. Upload files

From this point, it is easy to have the same logic and to upload files based on the needed request:

curl -X POST https://api.dropboxapi.com/2-beta-2/files/upload \
     --header "Authorization: Bearer <access-token>" \
     --header "Content-Type: application/octet-stream" \
     --header "Dropbox-API-Arg: {\"path\": \"/cupcake.png\", \"mode\": \"overwrite\"}" \
     --data-binary @local-file.png

We just need to link a similar GetFile processor (as described before) with a new InvokeHTTP processor with the following properties:

upload_file.png

This way, it will upload the incoming FlowFile with the correct file name at the root of your Dropbox directory (don’t forget to use your own Dropbox access token).

In conclusion, for upload, the flow is as simple as:

upload_flow.PNG

That is it! I think you have everything you need to enjoy NiFi if you want to play with it to handle content on your Dropbox account.

Here is the template of the full upload/download flow (don’t forget to update it with your access token).

Feel free to add comments and ask questions about this post!

Transform data with Apache NiFi

Few days ago, I just started to have a look into Apache NiFi which is now part of the Hortonworks Data Flow distribution (HDF). Based on my experience at Capgemini and the kind of projects into I have been involved, I immediately realized that it is a powerful system that can be used in a wide range of situations and problems.

Apache NiFi is an easy to use, powerful, and reliable system to process and distribute data. It supports powerful and scalable directed graphs of data routing, transformation, and system mediation logic. Some of the high-level capabilities and objectives of Apache NiFi include:

  • Web-based user interface
    • Seamless experience between design, control, feedback, and monitoring
  • Highly configurable
    • Loss tolerant vs guaranteed delivery
    • Low latency vs high throughput
    • Dynamic prioritization
    • Flow can be modified at runtime
    • Back pressure
  • Data Provenance
    • Track dataflow from beginning to end
  • Designed for extension
    • Build your own processors and more
    • Enables rapid development and effective testing
  • Secure
    • SSL, SSH, HTTPS, encrypted content, etc…
    • Pluggable role-based authentication/authorization

It must also be noted, that it is really easy to get started with NiFi (whatever the OS you are using): just download it, run it, and open your favorite web browser at localhost:8080.

Now you may tell yourself that it is just a tool to get data from a point A to a point B according to parameters, conditions, etc. In other words, you may think that it is just a tool to extract and load data. What are the transformation features offered by NiFi?

Just have a look to the list of 125+ processors available at this moment and you will have a good idea of what you can achieve. But let’s say you need to do some very specific actions on your data and you don’t see any processor suitable to your need. So what’s next?

First, you can write your own processor, it is very easy and straightforward (NiFi developer guide). Otherwise you can leverage some existing processors to execute custom code. For example you can achieve a lot of work with ExecuteScript and ExecuteStreamCommand processors. If you are interested by the first one, have a look on this blog to find useful and complete examples.

In this post, I want to focus on ExecuteStreamCommand and how it can be used to define data transformation flows. One common use case I see is to get files from one place and execute an application to transform the files.

For simplicity, let’s say I have an input directory in which files to be processed are coming. For each file A of this directory, I want to execute an application to transform this data into a new file B:

flow_AtoB

This can be easily achieved by the combination of two processors: ListFiles and ExecuteStreamCommand.

Here is an example running on Windows: I look for any new file in “D:\tmp-input” with a ListFiles processor using following properties:

listfiles

For each new file coming in this directory, the processor will generate a FlowFile (see NiFi documentation to learn about NiFi principles) with some useful attributes and no content.

Now, I have a batch file that I want to be executed on each file. This batch file takes exactly one parameter which is the path of the file to be processed.

My batch file is the following (command.bat) and is very simple, it moves the given file into another directory (obviously, this is just an example, if we just want to move files, NiFi can do that without executing commands!):

@echo off
MOVE %1 "D:\tmp" >nul

Then my ExecuteStreamCommand will have the following properties:

executeStreamCommand

Using the Expression Language provided with NiFi, I can extract the path of the file previously listed by ListFiles processor by extracting information from the FlowFile attributes.

${absolute.path}${filename}

This is the concatenation of attributes “absolute.path” and “filename” from the incoming FlowFile (attributes set by ListFiles processor).

The processor can send to the executed process the content of the incoming FlowFile, but in my case there is no content and I don’t want such a thing (Ignore STDIN = true). Besides, this processor can create a new FlowFile using the output of the command as content of the newly created FlowFile. But this is something I don’t want with this command, so I set the property “Output Destination Attribute” with the value “result”. This way, the output of my command is used as a new (or updated) attribute of my original FlowFile.

If you don’t need this FlowFile anymore, you can auto-terminate the processor, and you have a ready-to-go flow for processing files using your own command.

If you want another process to run on the newly created file (in D:\tmp), you can copy/paste this flow logic and use another ListFiles processor to scan the directory.

basic_flow.PNG

So far, it is a very basic operation. Something closer to real use case would be to create a flow where ExecuteStreamCommand processor gets information back from the executed command and passes it to the next processor.

Let’s transform my batch file to return the parameters to be passed to the next processor. In my example, my batch file moves the file given as first parameter to file path given in second parameter. At the end, it displays the new path of the file (second parameter) and the third parameter to be used in next processor (again, this is an example, in real world such parameters would probably be computed by the executed process itself).

@echo off
ECHO %* >> %1
MOVE %1 %2 >nul
ECHO "%2;%3"

Note: since I configured my processor to accept arguments delimited with “;” I display parameters to be passed with this delimiter. It is possible to change the delimiter in the processor properties (I recommend reading the usage documentation associated to each processor, this is very helpful).

Let’s have a look to the configuration of my first ExecuteStreamCommand processor:

first_execute

This will execute my batch file with:

  • 1st parameter:  path of my input file listed by ListFiles processor
  • 2nd parameter: destination path of my file
  • 3rd parameter: parameter to be passed to next processor

My batch file, once called, will move my file to “D:\step1.txt” and will display:

“D:\\step1.txt;D:\\step2.txt”

This will be set at the value of the attribute with “result” as key in the generated FlowFile.

The second ExecuteStreamCommand processor has the following configuration:

second_execute

Here, I use Expression Language functions (replaceAll) to remove special characters introduced in the process (it is possible to adapt the batch script to avoid this operation).

 

This will execute my batch file with:

  • 1st parameter:  D:\\step1.txt
  • 2nd parameter: D:\\step2.txt
  • 3rd parameter: null

My batch file, once called, will move my file to “D:\step2.txt” and will display:

“D:\\step2.txt”

In conclusion, by taking advantage of the Expression Language and by controlling the output of the executed commands, I can use the FlowFile initially created by ListFiles processors to carry information along the flow and propagate information and arguments to following steps.

For a better understanding, here is a picture of how the flow file has evolved through the flow:

flow.PNG

Between the ListFiles processor and the first ExecuteStreamCommand processor, the flow file has the following attributes :

flow_file_1

Then between the two ExecuteStreamCommand processors:

flow_file_2

And at the end of the flow (if we want to add other processors):

flow_file_3

As a side note, we can see that the return code of the executed command is available in the FlowFile with the attribute “execution.status”. It easily allows to route FlowFile depending of what is the return code using RouteOnAttribute processor. Thus, it is possible to implement “catch” / “retry” strategies in case of error.

Please feel free to comment/ask questions about this post!