Using the SmartSense Activity Explorer for cluster reporting

In the Hortonworks Data Platform, there is SmartSense, a service that analyzes cluster diagnostic data, identifies potential issues, and recommends specific solutions and actions.

SmartSense is made of multiple components and one of the component is the Activity Explorer which is a customized Zeppelin notebook used to access and display the data collected by the Activity Analyzer instances stored in an HBase instance and accessed using Phoenix.

The Activity Explorer gives access to a lot of very useful data when administrating a cluster. For an exhaustive list, have a look to the documentation here.

By default, this Activity Explorer / Zeppelin is configured with the Phoenix interpreter only. The idea of this post is to describe how we can add the JDBC interpreter (or any other interpreter) to allow administration teams using this specific Zeppelin instance as a more general tool for cluster reporting.

One might wonder why I’m not using the Zeppelin service available in the HDP stack. The reason is quite simple: usually, the Zeppelin instances would be deployed on the edge nodes (to be used by the project teams / users of the cluster) while the Activity Explorer would be deployed on an administration node and only accessed by the administrators of the cluster. The idea is to keep Zeppelin instances separated based on the purpose.

First step is to package the JDBC interpreter. Go on the node where you installed the Zeppelin service (where all the interpreters are installed) – not the node where you installed the Activity Explorer component.

cd /usr/hdp/current/zeppelin-server/interpreter/
zip -r jdbc.zip jdbc/

And deploy this ZIP file on the node where is installed the Activity Explorer:

cd /usr/hdp/share/hst/activity-explorer/interpreter/
unzip jdbc.zip

Restart the Activity Explorer component so that the interpreter is available for configuration.

Go to the interpreter configuration page and add a new one, selecting the JDBC type. Configure the interpreter as needed based on your cluster (you can check the configuration you set for this interpreter in the Zeppelin service). In particular, you’ll need:

zeppelin.jdbc.auth.type=KERBEROS
zeppelin.jdbc.principal=<principal of the activity explorer>
zeppelin.jdbc.keytab.location=<keytab of the activity explorer>
hive.proxy.user.property=hive.server2.proxy.user

Note: do not use _HOST in the principal name, use the host FQDN instead.

I also strongly recommend you to configure SSL on the Activity Explorer as well as configuring proper authentication/authorization mechanisms. You can do all that through Ambari as you’d do for the Zeppelin service (have a look at the documentation here).

Since the Activity Explorer account is going to proxy your requests to Hive through the JDBC interpreter, you need to add the proper proxy rules:

hadoop.proxyuser.activity_explorer.groups=<administrator group>
hadoop.proxyuser.activity_explorer.hosts=<activity explorer host>

And you’ll have to restart the appropriate services.

If you stop here and restart the Activity Explorer component, you’ll loose your JDBC interpreter configuration because all of the interpreter configuration of the Activity Explorer is managed by Ambari and reset at each component restart. To prevent the loss of your configuration, you need to copy the content of the file:

/etc/smartsense-activity/conf/interpreter.json

(content of this file has been updated by the Activity Explorer after you added the JDBC interpreter)

And paste this content in Ambari / SmartSense / Advanced / Advanced activity-zeppelin-interpreter. This way, your configuration will remain the same.

Note: keep in mind that all this procedure might have to be done again after a SmartSense upgrade since it’s not the default deployment.

You’re now all set! If you’re wondering what can be done with the JDBC interpreter to enhance the cluster administration tasks… the first thing I can recommend is to create Hive tables on top of the Ranger audits stored in HDFS so that you can create long term reports based on all the cluster audits (if you’re using Solr for the Ranger audits, this data is only stored for a short period of time, default is 90 days). Creating Hive tables on top of the data sitting in HDFS can really be useful if you have compliance/security teams looking for audits reporting.

You could also use the JDBC interpreter to directly access the data in the database backend used for some services like Ambari, Ranger, Hive, etc. It can provide interesting data to build useful reports.

As always, thanks for reading, and feel free to ask questions / leave a comment.

Monitoring NiFi – Ambari & Grafana

Note – This article is part of a series discussing subjects around NiFi monitoring.

When using Apache NiFi (note that version 1.2.0 is now released!) as part of HDF, a lot of of things are simplified using Apache Ambari to deploy NiFi and manage its configuration. Also, using Ambari Metrics service and Grafana, you have a way to easily and visually monitor NiFi performances. And you can also use Apache Ranger to centralize the authorizations management for multiple components (NiFi, Kafka, etc) in one single place.

This article will discuss how you can use Ambari Metrics and Grafana to improve your NiFi monitoring. Let’s start with a quick discussion around AMS (Ambari Metrics System). By default this service is running a Metrics Collector with an embedded HBase instance (and a Zookeeper instance) to store all the metrics, and Ambari will also deploy Metrics Monitor instances on all the nodes of the cluster. The monitors will collect the metrics at system level and send the metrics to the collector. However, the collector also exposes a REST API and that’s what NiFi is going to use with the AmbariReportingTask.

GrafanaBlogOverview

Source and documentation is on the Hortonworks website here.

When using HDF, the Ambari Reporting task should be already up and running for you. If not, you can add it and configure it with a frequency of one minute (it does matter) and use the following parameters:

Screen Shot 2017-05-11 at 9.30.46 AM

Note that “ambari.metrics.collector.url” is an environment variable already set for you when Ambari is starting NiFi. You could also directly give the address, in my case:

http://pvillard-hdf-1:6188/ws/v1/timeline/metrics

Once this reporting task is up and running, you should be able to see the metrics on the NiFi service page in Ambari:

Screen Shot 2017-05-11 at 9.38.57 AM.png

Also, you can go into Grafana to display dashboards with the metrics of your components. You have pre-configured dashboards and here is the one for NiFi:

Screen Shot 2017-05-11 at 9.46.03 AM.png

Now, all the metrics we have here are at cluster level. We are not able to display metrics for specific workflows. With the latest release of Apache NiFi (1.2.0), there is now an optional parameter in the AmbariReportingTask to specify a process group ID. This way, by creating a second reporting task (keep the one providing cluster-level metrics) and by specifying the ID of a specific process group, you can actually create your Grafana dashboards at workflow level.

Let’s say I’ve the following workflow:

Screen Shot 2017-05-11 at 9.52.49 AM

And inside my process group, I have:

Screen Shot 2017-05-11 at 9.52.59 AM.png

Now, my process group having the ID “75973b6e-2d38-1cf3-ffff-fffffdea8cbc”, I can define the following Ambari reporting task:

Screen Shot 2017-05-11 at 9.54.50 AM.png

Note – you must keep “nifi” as the Application ID as it has to match the configuration of the Ambari Metrics System.

Once your reporting task is running, in Grafana, you can create your own dashboard for this workflow and display the metrics you want:

Screen Shot 2017-05-11 at 10.08.59 AM.png

For my Kafka example, here is the dashboard I defined:

Screen Shot 2017-05-11 at 10.39.47 AM.png

In this example, I can see that my workflow is running fine but the free disk space on one of my node is decreasing very quickly. It turns out that when my disk is completely filled, back pressure will be enabled in my workflow and there is no more data sent to Kafka. Instead data is queued in NiFi.

This simple example gives me a lot of information:

  • Everything is default configuration in Ambari and I chose my three NiFi nodes to also host Kafka brokers. By default, for Kafka, the replication factor is set to 1, the number of partitions is set to 1 and the automatic creation of topic is allowed (that’s why I didn’t need to create the topic before starting my workflow). Because of the default parameters, all of the data is sent to only one Kafka broker (pvillard-hdf-2) and that’s why the disk space is quickly decreasing on this node since my three NiFi nodes are sending data to this broker.
  • Also, we clearly see that’s not a good idea to collocate NiFi and Kafka on the same nodes since they are both IO intensive. In this case, they are using the same disk… and we can see that the task duration (for NiFi) is clearly higher on the Kafka node that is receiving the data (pvillard-hdf-2). Long story short: keep NiFi and Kafka on separated nodes (or at the very least with different disks).

With HDF and the Ambari Metrics System, it gives you the ability to create custom relevant dashboards for specific use cases. It also allows you to mix information from Kafka, from NiFi and from the hosts to have all the needed information in one single place.

Also, by using the REST API of the Metrics Collector (you may be interested by this article), you could also send your own data (not only the data gathered at the process group level) to add more information into your dashboards. An example that comes in mind would be to send the lineage duration (see Monitoring of Workflow SLA) at the end of the workflow using an InvokeHTTP processor and sending a JSON payload using a POST request to the API endpoint.

Let’s say I want to monitor how long it takes between my GenerateFlowFile and the end of my workflow to check if some particular events are taking longer. Then I could have something like:

Screen Shot 2017-05-11 at 5.58.02 PM.png

What am I doing here? I want to send to AMS the information about the lineage duration of the flow files I sent into my Kafka topic. However I don’t want to send the duration of every single event (that’s not really useful and it’s going to generate a lot of requests/data). Instead I want to make an API call only once per minute. The idea is to compute the mean and max of the lineage duration with a rolling window of one minute and to only send this value to AMS.

I could use the new AttributeRollingWindow processor but it is not as fast as the PublishKafka and I don’t want to generate back pressure in my relationships. So I use the InvokeScriptedProcessor to build my own rolling processor (it’s faster because I am not using any state information):

  • this processor takes a frequency duration as a parameter (that I’ll set to 1 minute in this example)
  • for every flow file coming in, it will extract the lineage start date to compute max and mean lineage duration over the rolling window. If the last flow file sent in the success relationship was less than one minute ago, I’ll route the flow file to drop relationship (that I set to auto-terminated). If it was more than one minute ago, I update the attributes of the current flow file with the mean and max of all the flow files since the last “success” flow file and route this flow file in the success relationship

Since I’ve flow files coming in my processor at a high rate, I know that my processor will release one flow file every minute with the mean and max of the linage duration for the flow files of the last minute.

Then I use a ReplaceText processor to construct the JSON payload that I’ll send to the Metrics Collector using the InvokeHttp processor.

Here is the configuration of the InvokeScriptedProcessor:

Screen Shot 2017-05-11 at 7.54.00 PM

The Groovy script used can be found here.

Then I create the JSON payload with the ReplaceText processor:

Screen Shot 2017-05-11 at 7.56.33 PM.png

Note that I use the ID of the processor as the “instanceid” attribute in the JSON.

Then, I use the InvokeHttp processor (with a scheduling/frequency of 1 minute):

Screen Shot 2017-05-11 at 7.57.45 PM.png

Now, I can use this information to build the corresponding graph in my Grafana dashboard:

Screen Shot 2017-05-11 at 7.59.27 PM

I can see that, in average, it takes about 150 milliseconds to generate my flow file, publish it in my Kafka topic and get it into my scripted processor. I could also generate one metric per host of my cluster to check if a node is performing badly compared to the others.

Now you can easily send your custom data into AMS and create dashboards for your specific use cases and workflows.

As usual feel free to ask questions and comment this post.

Monitoring NiFi – Introduction

Apache NiFi 1.2.0 has just been released with a lot of very cool new features… and I take this opportunity to start a series of articles around monitoring. This is a recurring subject and I often hear the same questions. This series won’t provide an exhaustive list of the ways you can use to monitor NiFi (with or without HDF) but, at least, it should get you started!

Here is a quick summary of the subjects that will be covered:

For this series of article, I will use, as a demo environment, a 4-nodes HDF cluster (running on CentOS 7):

I’m using HDF to take advantage of Ambari to ease the deployment but this is not mandatory for what I’m going to discuss in the articles (except for stuff around the Ambari reporting task obviously).

I will not cover how setting up this environment but if this is something you are looking after, feel free to ask questions (here or on the Hortonworks Community Connection) and to have a look into Hortonworks documentation about HDF.

I don’t want to write a single (very) long article and for the sake of clarity there is one article per listed subject. Also, I’ll try to update the articles to stick as best as possible to latest features provided by NiFi over time.

Also, if you feel that some subjects should be added to the list, let me know and I’ll do my best to cover other monitoring-related questions.

Apache NiFi 1.0.0 – Cluster setup

As you may know a version 1.0.0-BETA of Apache NiFi has been released few days ago. The upcoming 1.0.0 release will be a great moment for the community as it it will mark a lot of work over the last few months with many new features being added.

The objective of the Beta release is to give people a chance to try this new version and to give a feedback before the official major release which will come shortly. If you want to preview this new version with a completely new look, you can download the binaries here, unzip it, and run it (‘./bin/nifi.sh start‘ or ‘./bin/run-nifi.bat‘ for Windows), then you just have to access http://localhost:8080/nifi/.

The objective of this post is to briefly explain how to setup an unsecured NiFi cluster with this new version (a post for setting up a secured cluster will come shortly with explanations on how to use a new tool that will be shipped with NiFi to ease the installation of a secured cluster).

One really important change with this new version is the new paradigm around cluster installation. From the NiFi documentation, we can read:

Starting with the NiFi 1.0 release, NiFi employs a Zero-Master Clustering paradigm. Each of the nodes in a NiFi cluster performs the same tasks on the data but each operates on a different set of data. Apache ZooKeeper elects one of the nodes as the Cluster Coordinator, and failover is handled automatically by ZooKeeper. All cluster nodes report heartbeat and status information to the Cluster Coordinator. The Cluster Coordinator is responsible for disconnecting and connecting nodes. As a DataFlow manager, you can interact with the NiFi cluster through the UI of any node in the cluster. Any change you make is replicated to all nodes in the cluster, allowing for multiple entry points to the cluster.

zero-master-cluster

OK, let’s start with the installation. As you may know it is greatly recommended to use an odd number of ZooKeeper instances with at least 3 nodes (to maintain a majority also called quorum). NiFi comes with an embedded instance of ZooKeeper, but you are free to use an existing cluster of ZooKeeper instances if you want. In this article, we will use the embedded ZooKeeper option.

I will use my computer as the first instance. I also launched two virtual machines (with a minimal Centos 7). All my 3 instances are able to communicate to each other on requested ports. On each machine, I configure my /etc/hosts file with:

192.168.1.17 node-3
192.168.56.101 node-2
192.168.56.102 node-1

I deploy the binaries file on my three instances and unzip it. I now have a NiFi directory on each one of my nodes.

The first thing is to configure the list of the ZK (ZooKeeper) instances in the configuration file ‘./conf/zookeeper.properties‘. Since our three NiFi instances will run the embedded ZK instance, I just have to complete the file with the following properties:

server.1=node-1:2888:3888
server.2=node-2:2888:3888
server.3=node-3:2888:3888

Then, everything happens in the ‘./conf/nifi.properties‘. First, I specify that NiFi must run an embedded ZK instance, with the following property:

nifi.state.management.embedded.zookeeper.start=true

I also specify the ZK connect string:

nifi.zookeeper.connect.string=node-1:2181,node-2:2181,node-3:2181

As you can notice, the ./conf/zookeeper.properties file has a property named dataDir. By default, this value is set to ./state/zookeeper. If more than one NiFi node is running an embedded ZK, it is important to tell the server which one it is.

To do that, you need to create a file name myid and placing it in ZK’s data directory. The content of this file should be the index of the server as previously specify by the server. property.

On node-1, I’ll do:

mkdir ./state
mkdir ./state/zookeeper
echo 1 > ./state/zookeeper/myid

The same operation needs to be done on each node (don’t forget to change the ID).

If you don’t do this, you may see the following kind of exceptions in the logs:

Caused by: java.lang.IllegalArgumentException: ./state/zookeeper/myid file is missing

Then we go to clustering properties. For this article, we are setting up an unsecured cluster, so we must keep:

nifi.cluster.protocol.is.secure=false

Then, we have the following properties:

nifi.cluster.is.node=true
nifi.cluster.node.address=node-1
nifi.cluster.node.protocol.port=9999
nifi.cluster.node.protocol.threads=10
nifi.cluster.node.event.history.size=25
nifi.cluster.node.connection.timeout=5 sec
nifi.cluster.node.read.timeout=5 sec
nifi.cluster.firewall.file=

I set the FQDN of the node I am configuring, and I choose the arbitrary 9999 port for the communication with the elected cluster coordinator. I apply the same configuration on my other nodes:

nifi.cluster.is.node=true
nifi.cluster.node.address=node-2
nifi.cluster.node.protocol.port=9999
nifi.cluster.node.protocol.threads=10
nifi.cluster.node.event.history.size=25
nifi.cluster.node.connection.timeout=5 sec
nifi.cluster.node.read.timeout=5 sec
nifi.cluster.firewall.file=

and

nifi.cluster.is.node=true
nifi.cluster.node.address=node-3
nifi.cluster.node.protocol.port=9999
nifi.cluster.node.protocol.threads=10
nifi.cluster.node.event.history.size=25
nifi.cluster.node.connection.timeout=5 sec
nifi.cluster.node.read.timeout=5 sec
nifi.cluster.firewall.file=

We have configured the exchanges between the nodes and the cluster coordinator, now let’s move to the exchanges between the nodes (to balance the data of the flows). We have the following properties:

nifi.remote.input.host=node-1
nifi.remote.input.secure=false
nifi.remote.input.socket.port=9998
nifi.remote.input.http.enabled=true
nifi.remote.input.http.transaction.ttl=30 sec

Again, I set the FQDN of the node I am configuring and I choose the arbitrary 9998 port for the Site-to-Site (S2S) exchanges between the nodes of my cluster. The same applies for all the nodes (just change the host property with the correct FQDN).

It is also important to set the FQDN for the web server property, otherwise we may get strange behaviors with all nodes identified as ‘localhost’ in the UI. Consequently, for each node, set the following property with the correct FQDN:

nifi.web.http.host=node-1

And that’s all! Easy, isn’t it?

OK, let’s start our nodes and let’s tail the logs to see what’s going on there!

./bin/nifi.sh start && tail -f ./logs/nifi-app.log

If you look at the logs, you should see that one of the node gets elected as the cluster coordinator and then you should see heartbeats created by the three nodes and sent to the cluster coordinator every 5 seconds.

You can connect to the UI using the node you want (you can have multiple users connected to different nodes, modifications will be applied on each node). Let’s go to:

http://node-2:8080/nifi

Here is what it looks like:

Screen Shot 2016-08-13 at 7.33.08 PM

As you can see in the top-left corner, there are 3 nodes in our cluster. Besides, if we go in the menu (button in the top-right corner) and select the cluster page, we have details on our three nodes:

Screen Shot 2016-08-13 at 7.35.28 PM

We see that my node-2 has been elected as cluster coordinator, and that my node-3 is my primary node. This distinction is important because some processors must run on a unique node (for data consistency) and in this case we will want it to run “On primary node” (example below).

We can display details on a specific node (“information” icon on the left):

Screen Shot 2016-08-13 at 7.35.48 PM

OK, let’s add a processor like GetTwitter. Since the flow will run on all nodes (with balanced data between the nodes), this processor must run on a unique processor if we don’t want to duplicate data. Then, in the scheduling strategy, we will choose the strategy “On primary node”. This way, we don’t duplicate data, and if the primary node changes (because my node dies or gets disconnected), we won’t loose data, the workflow will still be executed.

Screen Shot 2016-08-13 at 7.45.19 PM

Then I can connect my processor to a PutFile processor to save the tweets in JSON by setting a local directory (/tmp/twitter):

Screen Shot 2016-08-13 at 7.52.25 PM

If I run this flow, all my JSON tweets will be stored on the primary node, the data won’t be balanced. To balance the data, I need to use a RPG (Remote Process Group), the RPG will connect to the coordinator to evaluate the load of each node and balance the data over the nodes. It gives us the following flow:

Screen Shot 2016-08-13 at 8.00.26 PM

I have added an input port called “RPG”, then I have added a Remote Process Group that I connected to ” http://node-2:8080/nifi ” and I enabled transmission so that the Remote Process Group was aware of the existing input ports on my cluster. Then in the Remote Process Group configuration, I enabled the RPG input port. I then connected my GetTwitter to the Remote Process Group and selected the RPG input port. Finally, I connected my RPG input port to my PutFile processor.

When running the flow, I now have balanced data all over my nodes (I can check in the local directory ‘/tmp/twitter‘ on each node).

That’s all for this post. I hope you enjoyed it and that it will be helpful for you if setting up a NiFi cluster. All comments/remarks are very welcomed and I kindly encourage you to download Apache NiFi, to try it and to give a feedback to the community if you have any.

US presidential election via Twitter using Apache NiFi, Spark, Hive and Zeppelin

This article describes a frequency and sentiment analysis based on real-time tweets streams in relation to the four main candidates in the US Presidential Election.

The main objective was to deploy and to test the available connector between Apache NiFi and Apache Spark, so I decided to implement the following use case:

At the end, I get real time analytics such as:

  • frequency of tweets along the time per candidate
  • percentage of negative, positive and neutral tweets per candidate
  • opinion trends along the time for each candidate

The article is available on Hortonworks Community Connection website. And as always, please feel free to comment and/or ask questions.

OAuth 1.0A with Apache NiFi (Twitter API example)

A lot of API are using OAuth  protocol to authorize the received requests and to check if everything is OK regarding the identity of the request sender.

OAuth is an open standard for authorization, commonly used as a way for Internet users to log into third party websites using their Microsoft, Google, Facebook, Twitter, One Network etc. accounts without exposing their password. Generally, OAuth provides to clients a “secure delegated access” to server resources on behalf of a resource owner. It specifies a process for resource owners to authorize third-party access to their server resources without sharing their credentials. Designed specifically to work with Hypertext Transfer Protocol (HTTP), OAuth essentially allows access tokens to be issued to third-party clients by an authorization server, with the approval of the resource owner. The third party then uses the access token to access the protected resources hosted by the resource server.

As a remark, there are two versions of the protocol currently used out there: 1.0A and 2.0. As far as I know, 1.0A is the most commonly used. I already faced the need to use OAuth 1.0A protocol with the Flickr API but, back then, I found a way to get my data differently.

Recently, a question was asked on the Hortonworks Community Connection regarding the use of Apache NiFi to get data from Twitter API using OAuth 1.0A protocol. So this time, I decided to have a look on this and to get the job done.

This post presents the flow I used to perform a request against Twitter API using OAuth protocol. It gives me the opportunity to use for the first time the ExecuteScript processor which allows user to execute custom scripts on the fly inside NiFi (you will find a lot of examples on this great site).

Note 1: this was the first time I used Groovy language, be nice with me!

Note 2: I didn’t test the flow on a lot of methods. Some modifications may be necessary for some cases.

OK. The objective was to request the “users/lookup” method of the Twitter API. You can read the documentation here.

I want to perform a HTTP GET on:

https://api.twitter.com/1.1/users/lookup.json?screen_name=twitterapi,twitter

So far it seems really easy to do with a simple InvokeHTTP processor. The thing is you need to identify yourself when sending the request. Here comes the OAuth protocol. The official specification for 1.0A can be read here. But in the case of the Twitter API, you have a nice documentation here.

Besides on the documentation of each method, you have an OAuth Signature Generator that can be accessed (if you have defined a Twitter App). The generator is here. It lets you play around and gives great insights on each request to debug your own implementation of OAuth protocol.

The global idea is: you have a lot of input parameters and you must follow the specifications to construct a string based on the parameters. This string will be the value associated to “Authorization” key in HTTP header properties.

Here is the list of the needed parameters. First the parameters directly linked to your request:

Then the global parameters related to OAuth:

  • Consumer key (private information of your app provided by Twitter)
  • Consumer secret (private information of your app provided by Twitter)
  • Nonce (random string, uniquely generated for each request)
  • Signature method (with Twitter it is HMAC-SHA1)
  • Timestamp (in seconds)
  • Token (private information of your app provided by Twitter)
  • Token secret (private information of your app provided by Twitter)
  • Version (in this case 1.0)

The first step is to construct the “signature base string“. For that you first need to create the “parameter string“. All is very well explained here. Once you have the signature base string, you can encode it using HMAC-SHA1 and you easily get the header property to set in your HTTP request:

Authorization: OAuth oauth_consumer_key="*******", oauth_nonce="a9ab2392e5158a4c4e029c7829164571", oauth_signature="4s4Hi5hQ%2FoLKprW7qsRlImds3Og%3D", oauth_signature_method="HMAC-SHA1", oauth_timestamp="1460453975", oauth_token="*******", oauth_version="1.0"

Let’s get into the details using Apache NiFi. Here is the flow I created:

oauthFlow

I use a GenerateFlowFile to generate an empty Flow File (FF) in order to execute my flow. Then I use an UpdateAttribute processor to add attributes to my FF. In this case, I only add the parameters related to the specific request I want to execute:

globalParam

Then I send my FF into a process group that will compute the header property to set (I will come back to this part later). Then I perform my request using the InvokeHTTP processor:

invokeHTTP

I set the method to GET, the URL to my corresponding FF attribute, the content type to text/plain and I add a custom property named Authorization with the FF attribute I created in my process group (see below). This custom property will be added as a HTTP header in the request. At the end, I use a PutFile processor to write the result of my request in a local directory.

Let’s go to the interesting part of our flow where all the magic is: the process group I named OAuth 1.0A. Here it is:

processGroup

I just use two processors. The first one is an UpdateAttribute to add all the parameters I need as attributes of my FF. the second one is an ExecuteScript processor where I put my groovy code to compute the header property.

First… the parameters:

oauthParameters

Note: I use Expression Language provided by NiFi for some attributes.

  • arguments is used to extract the argument part in my target URL. In this example: screen_name=twitterapi,twitter
  • base_url is the URL I request without any argument. In this example: https://api.twitter.com/1.1/users/lookup.json
  • For the nonce parameter I use the “UUID” method of the expression language which generated a random string and I just take to replace the ‘-‘ characters to only keep an alphanumeric string.
  • For the timestamp, I use the “now” method of the expression language and I use substring to only keep seconds.

Let’s move to the ExecuteScript part. I set the script engine to Groovy and I put my script code in the “script body” property. The full code is available at the end of the post. Let’s go through it piece by piece.

First thing, I want to trigger my code only when a FF is available:

def flowFile = session.get()
if (!flowFile) return

Then I define a method I will use for the HMAC-SHA1 encoding:

def static hmac(String data, String key) throws java.security.SignatureException
{
    String result
    try {
        // get an hmac_sha1 key from the raw key bytes
        SecretKeySpec signingKey = new SecretKeySpec(key.getBytes(), "HmacSHA1");
        // get an hmac_sha1 Mac instance and initialize with the signing key
        Mac mac = Mac.getInstance("HmacSHA1");
        mac.init(signingKey);
        // compute the hmac on input data bytes
        byte[] rawHmac = mac.doFinal(data.getBytes());
        result= rawHmac.encodeBase64()
    } catch (Exception e) {
        throw new SignatureException("Failed to generate HMAC : " + e.getMessage());
    }
    return result
}

For this part, I will need to add some imports at the beginning of my script body:

import java.security.SignatureException
import javax.crypto.Mac
import javax.crypto.spec.SecretKeySpec

Then I retrieve all the attributes of my FF and I extract some attributes I don’t need to construct my parameter string:

def attributes = flowFile.getAttributes()
// retrieve arguments of the target and split arguments
def arguments = attributes.arguments.tokenize('&')
def method = attributes.method
def base_url = attributes.base_url
def consumerSecret = attributes.oauth_consumer_secret
def tokenSecret = attributes.oauth_token_secret

Then I create a TreeMap in which I add all the parameters I need to construct my parameter string. A TreeMap ensures me that it is sorted on keys in alphabetical order.

TreeMap map = [:]

for (String item : arguments) {
        def (key, value) = item.tokenize('=')
        map.put(key, value)
}

map.put("oauth_consumer_key", attributes.oauth_consumer_key)
map.put("oauth_nonce", attributes.oauth_nonce)
map.put("oauth_signature_method", attributes.oauth_signature_method)
map.put("oauth_timestamp", attributes.oauth_timestamp)
map.put("oauth_token", attributes.oauth_token)
map.put("oauth_version", attributes.oauth_version)

Then I add a method to the String class to allow percent encoding on String objects:

String.metaClass.encode = {
    java.net.URLEncoder.encode(delegate, "UTF-8").replace("+", "%20").replace("*", "%2A").replace("%7E", "~");
}

I am now able to construct the parameter string:

String parameterString = ""

map.each { key, value ->
    parameterString += key.encode()
    parameterString += '='
    parameterString += value.encode()
    parameterString += '&'
}

parameterString = parameterString.substring(0, parameterString.length()-1);

Update: the code above can be simplified as below (see Andy’s comment)

String parameterString = map.collect { String key, String value ->
    "${key.encode()}=${value.encode()}"
}.join("&")

It is now possible to get the signature:

String signatureBaseString = ""
signatureBaseString += method.toUpperCase()
signatureBaseString += '&'
signatureBaseString += base_url.encode()
signatureBaseString += '&'
signatureBaseString += parameterString.encode()

String signingKey = consumerSecret.encode() + '&' + tokenSecret.encode()
String oauthSignature = hmac(signatureBaseString, signingKey)

I may add this information as a new attribute of my FF:

flowFile = session.putAttribute(flowFile, 'oauth_signature', oauthSignature)

Then I can construct the header property value to associate to Authorization key:

String oauth = 'OAuth '
oauth += 'oauth_consumer_key="'
oauth += attributes.oauth_consumer_key.encode()
oauth += '", '
oauth += 'oauth_nonce="'
oauth += attributes.oauth_nonce.encode()
oauth += '", '
oauth += 'oauth_signature="'
oauth += oauthSignature.encode()
oauth += '", '
oauth += 'oauth_signature_method="'
oauth += attributes.oauth_signature_method.encode()
oauth += '", '
oauth += 'oauth_timestamp="'
oauth += attributes.oauth_timestamp.encode()
oauth += '", '
oauth += 'oauth_token="'
oauth += attributes.oauth_token.encode()
oauth += '", '
oauth += 'oauth_version="'
oauth += attributes.oauth_version.encode()
oauth += '"'

I add this information as an attribute (that will be used in the InvokeHTTP processor as we saw before) and I forward my FF to the success relationship:

flowFile = session.putAttribute(flowFile, 'oauth_header', oauth)
session.transfer(flowFile, REL_SUCCESS)

That’s it: I have an operational flow implementing OAuth 1.0A protocol to request against the Twitter API.

The full code is available here as a gist.
The NiFi template is available here.

As always, feel free to ask questions and comment this post!

 

Analyze Flickr user interests using Apache NiFi and Spark

Let’s have some fun with Apache NiFi by studying a new use case: analyze a Flickr account to get some information regarding what kind of pictures the owner likes.

Before getting started, have a look on my previous posts if it is the first time you hear about NiFi. Also let me remind you that Flickr is one website where people can share their pictures. In addition, users can add pictures of others users as “favorites” (similar to a like on Facebook) and add other users as “contacts” to get all their recently published photos on their personal feed.

OK, so now I assume you know a little about Flickr. The nice thing is that there is an API exposed by this website to exchange information with it. To use it you need to request an API key. Once done, you’ll have a key and secret passphrase. You will need this information to request Flickr through Apache NiFi.

Let’s move to the global idea of what we are going to do: for a given account user we are going to get all the photos published by this user, all the photos liked (favorites) by this user, and most recent photos published by users followed by our guy. Next we will get all tags and groups associated to each photo. We will aggregate all this data and perform a word count to get an idea of what our initial user is looking for when using Flickr. Does it sound interesting? OK, let’s go then.

First, if you want to have details about the Flickr API and all exposed methods, go on this page. In our case, we are going to only use methods that do not require authentication (we are not going to fetch data that requires the authorization of the concerned user). The methods are:

  • flickr.people.getPhotos : to get photos published by a given user
  • flickr.favorites.getList : to get favorite photos of a given user
  • flickr.photos.getContactsPublicPhotos : to get recently published photos by contacts of a given user
  • flickr.tags.getListPhoto : to get the list of tags for a given photo
  • flickr.photos.getAllContexts : to get the list of groups (or pools) for a given photo

Note: regarding the photos published by contacts, we are only going to fetch the last 50 photos published by each contact of our user. It would be possible to have a workaround and to get all the pictures of each contact, but this could represent millions of photos (and consequently millions of API calls) and it could potentially deform the results of the analysis since the number of published photos can vary a lot between users in terms of quantity but also in terms of frequency. So let’s focus on a sample of most recent pictures.

To “launch” our flow, I will start with a GenerateFlowFile processor to have only one Flow File (FF) created. Then I will use an UpdateAttribute processor to add attributes to my FF in order to store global properties that will be used along the flow:

  • User ID (or NSID) of the user account we want to analyze
  • API key to use
  • Page (set to 1) that will be used to handle pagination of answers
  • Format (set to json) to get JSON answers from the Flickr API

flickr_properties

Then I use InvokeHTTP processors to perform calls to the FlickrAPI. It will almost be the same for each processor of this kind, so I will only describe one. Let’s describe the processor used to get all favorite pictures of our user.

The only parameter I am going to change is the Remote URL. This parameter is accepting expression language so I can reference attributes of my FF. I set the Remote URL to:

https://api.flickr.com/services/rest/?method=flickr.favorites.getList&api_key=${apiKey}&user_id=${userId}&per_page=500&page=${page}&format=${responseFormat}

As you can see, I give the method of the API I want to request, and also the expected parameters for this method. In this case, the API key, the user ID, the number of elements by page returned by the API (500 is the maximum authorized), the page number I want to request and the returned format.

The result will look like:

jsonFlickrApi({"photos":{"page":1,"pages":10,"perpage":500,"total":"4559","photo":[{"id":"26164116671","owner":"100524190@N04","secret":"3f80e32734","server":"1653","farm":2,"title":"STAIRWAY TO HEAVEN","ispublic":1,"isfriend":0,"isfamily":0,"date_faved":"1459772801"}]},"stat":"ok"})

As you can see, the Flickr API returns a string which is not directly a ready-to-use JSON string. So I add an intermediary step where I remove “jsonFlickrApi(” at the beginning and “)” at the end of the FF content using two ReplaceText processors.

Then I want a system to handle the pagination aspect. For that I create a process group with one input port where I’ll send my FF, and with two output ports (I’ll come back to this very soon).

In this processor group, I first use an EvaluateJsonPath processor to extract “page” and “pages” from the JSON string and get the values as attributes of my FF (you notice that this will update my already existing attribute “page”).

evaluate_page

 

Then I use a RouteOnAttribute processor and I use the Expression language to check if the current page number (attribute “page”) is equal to the total number of pages (attribute “pages”).

page_equal_pages

Whether it is true or not, I route my FF to the first input port that will send my FF to the next step of the flow. But if false, I use an UpdateAttribute processor to increment the attribute “page” to get the new page number I must request:

increment

Note: I also update the “filename” attribute to ensure my FF have different filenames. It is useful when using PutFile processor along the flow for debug purposes.

Then I send this FF in the second output port which goes back to the InvokeHTTP processor but with a new page number. This way, it will loop over all pages to get all expected results.

The processor group looks like:

processor_group

I use the same logic for the photos published by the user and for the photos published by the user’s contacts. At this moment, I can use a funnel to regroup all my results : I am ready to continue at photo level. So far the flow looks like:

flow_first_part

From this point, I use a SplitJson processor to get individual photo information:

splitjson

Then I use an EvaluateJsonPath to extract the photo ID of each photo and to get it as attribute of my FF:

extractPhotoId

I am now able to call the two methods to get tags and groups associated to the picture. I use the following Remote URLs:

Then, I clean once more time the returned string to get a correct JSON string and I call again SplitJson processors to get each tag and group into individual FFs.

Splitting list of tags:

splittags

Splitting list of groups:

splitgroup

Then I use an EvaluateJsonPath to extract tag name and group name as my new FF content:

tagname

At the end, I use a MergeContent processor to concatenate my FFs into bigger FFs with a tag/group name by line. For this processor I use the following parameters:

mergecontent

Note: for the demarcator parameter, I set Shift+Enter for the carriage return.

This way, I have FF of 20k lines except if my incoming queue did not reach this limit in less than 5 minutes.

I end my flow with a PutFile processor to store the results in a given directory.

The second part of the flow looks like:

flow_snd_part

That’s it! I have completed all the flow!

I ran this flow for a given user, it represented about 100k photos with about 450k tag/group names to analyze. To do that, I used Apache Spark and the famous word count example to extract most used keywords.

Once pyspark launched, I used the following commands:

import re
data = sc.textFile("D:/tmp/*")
data.flatMap(lambda line: re.sub('[^0-9a-zA-Z ]+', '', line).lower().split())
    .map(lambda word: (word, 1))
    .reduceByKey(lambda a, b: a+b)
    .map(lambda (x,y): (y,x))
    .sortByKey(False)
    .take(200)

You can notice that I did some cleaning on the characters and changed my strings to lower case. Once I have the 200 most used words, I do some manual cleaning to remove all unwanted things like “flickr”, “photo”, “the”, etc. And I finally get the 20-most used words related to the pictures I got through my flow for my given user:

word_count_result

I hope you enjoyed going through this use case, and I am sure it will give you some ideas to have fun with Apache NiFi ! Please feel free to comment or to ask questions about this post.