Screen Shot 2016-08-13 at 8.18.54 PM

Scaling up/down a NiFi cluster

Scaling up – add a new node in the cluster

You have a NiFi cluster and you are willing to increase the throughput by adding a new node? Here is a way to do it without restarting the cluster. This post starts with a 3-nodes secured cluster with embedded ZooKeeper and we are going to add a new node to this cluster (and this new node won’t have an embedded ZK).

The node I’m going to add is a CentOS 7 virtual machine running an Apache NiFi 1.1.0 instance. All prerequisites are met (network, firewall, java, etc) and I uncompressed the NiFi and NiFi toolkit binaries archives.

First thing to do is, if you don’t have DNS resolution, to update the /etc/hosts file on all the nodes so that each node can resolve the others.

Then, I want to issue a certificate signed by my Certificate Authority for my new node. For this purpose, I use the NiFi toolkit and issue a CSR against my running CA server (see secured cluster setup). I created a directory /etc/nifi, and from this directory I ran the command:

.../bin/tls-toolkit.sh client -c node-3 -t myTokenToUseToPreventMITM -p 9999

node-3 being the node where my CA server is running.

It generates:

  • config.json
  • keystore.jks
  • nifi-cert.pem
  • truststore.jks

Then I can configure my new node with the following properties in ./conf/nifi.properties (to match the configuration used on the running cluster):

 

nifi.cluster.protocol.is.secure=true
nifi.cluster.is.node=true
nifi.cluster.node.address=node-4
nifi.cluster.node.protocol.port=9998
nifi.remote.input.host=node-4
nifi.remote.input.secure=true
nifi.remote.input.socket.port=9997
nifi.web.https.host=node-4
nifi.web.https.port=8443
nifi.zookeeper.connect.string=node-1:2181,node-2:2181,node-3:2181

And the following properties based on the information from the generated config.json file:

nifi.security.keystore=/etc/nifi/keystore.jks
nifi.security.keystoreType=jks
nifi.security.keystorePasswd=JBJUu5MGhGdBj6pTZkZaBR2jRijtcjEMlwqnqc9WkMk
nifi.security.keyPasswd=JBJUu5MGhGdBj6pTZkZaBR2jRijtcjEMlwqnqc9WkMk
nifi.security.truststore=/etc/nifi/truststore.jks
nifi.security.truststoreType=jks
nifi.security.truststorePasswd=gQ7Bw9kbkbr/HLZMxyIvq3jtMeTkoZ/0anD1ygbtSt0
nifi.security.needClientAuth=true

Then we need to configure the ./conf/authorizers.xml file to specify the initial admin identity and the nodes identities:

<authorizers>
    <authorizer>
        <identifier>file-provider</identifier>
        <class>org.apache.nifi.authorization.FileAuthorizer</class>
        <property name="Authorizations File">./conf/authorizations.xml</property>
        <property name="Users File">./conf/users.xml</property>
        <property name="Initial Admin Identity">CN=pvillard, OU=NIFI</property>
        <property name="Legacy Authorized Users File"></property>

        <property name="Node Identity 1">CN=node-1, OU=NIFI</property>
        <property name="Node Identity 2">CN=node-2, OU=NIFI</property>
        <property name="Node Identity 3">CN=node-3, OU=NIFI</property>
    </authorizer>
</authorizers>

IMPORTANT: the authorizers.xml file must match the file that is on the other nodes. We do not specify the Node Identity of the new node, this will be handled when the node is joining the cluster. Also, before starting your new node, delete the ./conf/flow.xml.gz file so that this new node will pick up the current flow definition from the running cluster.

At this point, you can start the new node:

./bin/nifi.sh start && tail -f ./logs/nifi-app.log

And you will be able to see in the cluster view in the NiFi UI that your new node has joined the cluster:

screen-shot-2016-11-29-at-9-19-37-pm

You now need to add the new user corresponding to the new node:

screen-shot-2016-11-29-at-9-20-44-pm

screen-shot-2016-11-29-at-9-21-20-pm

And then update the policies to grant proxy “user requests” write access to the new node.

screen-shot-2016-11-29-at-9-21-37-pm

Now I want to confirm that my new node is used in the workflow processing. I am going to create a simple workflow that generates flow files on primary node, distribute the load on all the nodes of the cluster and store the generated files in /tmp. To do that, I first need to grant me the rights to modify the root process group content:

Click on the key symbol on the canvas and grant yourself all the access.

screen-shot-2016-11-29-at-9-26-59-pm

Then in order to simplify the next tasks (grant permissions to nodes for site-to-site communications), I recommend you to create a group called “nodes” and to add all the nodes user inside this group:

screen-shot-2016-11-29-at-9-42-19-pm

Then I create my workflow with a Remote Process Group and an Input Port to ensure the load balancing:

screen-shot-2016-11-29-at-9-43-48-pm

And I also specify that my GenerateFlowFile processor is running on primary node only:

screen-shot-2016-11-29-at-9-44-05-pm

Besides, I add specific rights to the Input Port to allow site-to-site communication between nodes. To do that, select the Input Port, then click on the key symbol in the Operate Panel and grant “receive site-to-site data” to the “nodes” group.

I can now start my workflow and confirm that data is stored in /tmp on each one of my nodes. I now have a new node participating in the execution of the workflow by the NiFi cluster!

Scaling down – decommission a node of the cluster

OK… now we want to check how a node can be decommissioned. There is a lot of reasons to be in this situation but the most common ones are:

  • upgrade the cluster in a rolling fashion (decommission a node, upgrade the node, move back the node in the cluster and move on to the next one)
  • add a new processor NAR to the cluster (decommission a node, add NAR in the node’s library, restart NiFi on this node, move back the node in the cluster and move on to the next one)

To do that, you just need to go in the cluster view, disconnect your node, stop your node, perform your modifications and restart the node to get it back in the cluster. Coordinator and primary roles will be automatically assigned to a new node if needed.

But let’s say we are in more a “destructive” situation (removing a node for good) and we want to ensure that the data currently processed by the node we are decommissioning is correctly processed before shutting down NiFi on this node.For this purpose, I am going to change the end point of my workflow with a PutHDFS instead of a PutFile and send the data to an external cluster. The objective is to check that all the generated data is correctly going into HDFS without any issue.Here is my workflow:screen-shot-2016-11-30-at-9-16-54-amAnd here is the current status of my cluster:screen-shot-2016-11-30-at-9-18-29-amMy GenerateFlowFile is running on the primary node only (node-1) and I’ll generate a file every 100ms. I’ll decommission the node-1 to also demonstrate the change of roles.

As a note: when you define a remote process group, you need to specify one of the nodes of the remote cluster you want to send data to. This specific node is only used when you are starting the remote process group to connect to the remote cluster. Once done, the RPG will be aware of all the nodes of the remote cluster and will be able to send data to all the nodes even though the specified remote node is disconnected. In a short future, it’ll be possible to specify multiple remote nodes in order to be more resilient (in case the RPG is stopped/restarted and the specified remote node is no more available).

To decommission a node, I only need to click on the “disconnect” button on the cluster view for the node I want to disconnect.Before disconnecting my node, I can check that my workflow is running correctly and I have flow files queued in my relationships:screen-shot-2016-11-30-at-9-31-46-amI can now disconnect my node-1 and check what is going on. Roles have been reassigned and the workflow is still running:screen-shot-2016-11-30-at-9-33-43-amscreen-shot-2016-11-30-at-9-33-55-amNote that, as I explained, the remote process group is still working as expected even though it was using node-1 as remote node (see note above).I can now access the UI of the node I just disconnected and check the current status of this node (https://node-1:8443/nifi). When I access the node, I’ve the following warning:screen-shot-2016-11-30-at-9-35-15-amAnd I can see that the workflow is still running on this node. This way, I can wait until all flow files currently on this node are processed:screen-shot-2016-11-30-at-9-35-27-amMy GenerateFlowFile is not running anymore since my node lost its role of Primary Node, and we can wait for the end of the processing of all flow files on this node. Once all flow files are processed, we have the guarantee that there will be no data loss and we can now shutdown the node if needed.If you want to reconnect it once you have performed your updates, you just need to go on the UI (from a node in the cluster), go in the cluster view and reconnect the node:screen-shot-2016-11-30-at-9-33-43-amThe node is now reconnected and used in the workflow processing:screen-shot-2016-11-30-at-9-43-24-amI can check that all the files I’ve generated have been transferred, as expected, on my HDFS cluster.screen-shot-2016-11-30-at-9-45-25-amThat’s all for this blog! As you can see, scaling up and down your NiFi cluster is really easy and it gives you the opportunity to ensure no service interruption.As always, comments and suggestions are welcomed.

 

Screen Shot 2016-08-13 at 8.18.54 PM

Apache NiFi 1.1.0 – Secured cluster setup

Apache NiFi 1.1.0 is now out, and I want to discuss a specific subject in a couple of posts: how to scale up and down a NiFi cluster without loosing data? Before going into this subject, I want to setup a 3-nodes secured cluster using the NiFi toolkit. It will be my starting point to scale up my cluster with an additional node, and then scale down my cluster.

There are already great posts describing how to setup a secured cluster using embedded ZK and taking advantage of the NiFi toolkit for the certificates, so I won’t go in too much details. For reference, here are some great articles I recommend:

OK… let’s move on. My initial setup is the following:

  • My OS X laptop
  • 2 CentOS 7 VM

All nodes have required prerequisites (network, java, etc), have the NiFi 1.1.0 binary archive uncompressed available, and have the NiFi toolkit 1.1.0 binary archive uncompressed available.

On my OS X laptop, I will use the NiFi TLS toolkit in server mode so that it acts as a Certificate Authority that can be used by clients to get Certificates.

Here is the description of how to use the TLS toolkit in server mode:

./bin/tls-toolkit.sh server

usage: org.apache.nifi.toolkit.tls.TlsToolkitMain [-a <arg>] [-c <arg>] [--configJsonIn <arg>] [-d <arg>] [-D <arg>] [-f <arg>] [-F] [-g] [-h] [-k <arg>] [-p
       <arg>] [-s <arg>] [-T <arg>] [-t <arg>]

Acts as a Certificate Authority that can be used by clients to get Certificates

 -a,--keyAlgorithm <arg>                   Algorithm to use for generated keys. (default: RSA)
 -c,--certificateAuthorityHostname <arg>   Hostname of NiFi Certificate Authority (default: localhost)
    --configJsonIn <arg>                   The place to read configuration info from (defaults to the value of configJson), implies useConfigJson if set.
                                           (default: configJson value)
 -d,--days <arg>                           Number of days issued certificate should be valid for. (default: 1095)
 -D,--dn <arg>                             The dn to use for the CA certificate (default: CN=YOUR_CA_HOSTNAME,OU=NIFI)
 -f,--configJson <arg>                     The place to write configuration info (default: config.json)
 -F,--useConfigJson                        Flag specifying that all configuration is read from configJson to facilitate automated use (otherwise configJson will
                                           only be written to.
 -g,--differentKeyAndKeystorePasswords     Use different generated password for the key and the keyStore.
 -h,--help                                 Print help and exit.
 -k,--keySize <arg>                        Number of bits for generated keys. (default: 2048)
 -p,--PORT <arg>                           The port for the Certificate Authority to listen on (default: 8443)
 -s,--signingAlgorithm <arg>               Algorithm to use for signing certificates. (default: SHA256WITHRSA)
 -T,--keyStoreType <arg>                   The type of keyStores to generate. (default: jks)
 -t,--token <arg>                          The token to use to prevent MITM (required and must be same as one used by clients)

In my case, I run the TLS toolkit on my node-3, and I run the following command:

./bin/tls-toolkit.sh server -c node-3 -t myTokenToUseToPreventMITM -p 9999

On each of my node, I created a nifi directory in /etc and I ran a command using the toolkit to get my signed certificates generated into my current directory:

.../bin/tls-toolkit.sh client -c node-3 -t myTokenToUseToPreventMITM -p 9999

And I have the following generated files on each of my nodes:

  • config.json
  • keystore.jks
  • nifi-cert.pem
  • truststore.jks

I now configure ZooKeeper as described here. Here is a short list of the tasks (in bold what slightly changed in comparison with my previous post):

  • configure conf/zookeeper.properties to list the ZK nodes
  • configure ZK state ID file
  • set nifi.state.management.embedded.zookeeper.start
  • set nifi.zookeeper.connect.string
  • set nifi.cluster.protocol.is.secure=true
  • set nifi.cluster.is.node=true
  • set nifi.cluster.node.address=node-<1-3>
  • set nifi.cluster.node.protocol.port=9998
  • set nifi.remote.input.host=node-<1-3>
  • set nifi.remote.input.secure=true
  • set nifi.remote.input.socket.port=9997
  • set nifi.web.https.host=node-<1-3>
  • set nifi.web.https.port=8443

(ports are changed to ensure there is no conflict with the TLS toolkit running the CA server)

I now configure the following properties with what has been generated by the toolkit:

nifi.security.keystore=
nifi.security.keystoreType=
nifi.security.keystorePasswd=
nifi.security.keyPasswd=
nifi.security.truststore=
nifi.security.truststoreType=
nifi.security.truststorePasswd=
nifi.security.needClientAuth=

For one of my node, it gives:

nifi.security.keystore=/etc/nifi/keystore.jks
nifi.security.keystoreType=jks
nifi.security.keystorePasswd=4nVzULZ+CBiPCePvFSCw4LDAvzNoumAqu+TcyeDQ1ac
nifi.security.keyPasswd=4nVzULZ+CBiPCePvFSCw4LDAvzNoumAqu+TcyeDQ1ac
nifi.security.truststore=/etc/nifi/truststore.jks
nifi.security.truststoreType=jks
nifi.security.truststorePasswd=0fFJ+pd4qkso0jC0jh7w7tLPPRSINYI6of+KnRBRVSw
nifi.security.needClientAuth=true

Then I generate a certificate for myself as a client to be able to authenticate against NiFi UI:

.../bin/tls-toolkit.sh client -c node-3 -t myTokenToUseToPreventMITM -p 9999 -D "CN=pvillard,OU=NIFI" -T PKCS12

Don’t forget the -T option to get your client certificate in a format that is easy to import in your browser (PKCS12). This command also generates a nifi-cert.pem file that corresponds to the CA certificate, you will need to import it in your browser as well (and you might need to manually update the trust level on this certificate to ensure you have access to the UI).

At this point I’m able to fill the authorizers.xml file. I need to specify myself as initial admin identity (to access the UI with full administration rights), and specify each nodes of my cluster (using the DN provided with the generated certificates). It gives:

<authorizers>
    <authorizer>
        <identifier>file-provider</identifier>
        <class>org.apache.nifi.authorization.FileAuthorizer</class>
        <property name="Authorizations File">./conf/authorizations.xml</property>
        <property name="Users File">./conf/users.xml</property>
        <property name="Initial Admin Identity">CN=pvillard, OU=NIFI</property>
        <property name="Legacy Authorized Users File"></property>

        <property name="Node Identity 1">CN=node-1, OU=NIFI</property>
        <property name="Node Identity 2">CN=node-2, OU=NIFI</property>
        <property name="Node Identity 3">CN=node-3, OU=NIFI</property>
    </authorizer>
</authorizers>

WARNING – Please be careful when updating this file because identities are case-sensitive and blank-sensitive. For example, even though I specified

-D "CN=pvillard,OU=NIFI"

when executing the command to generate the certificates, it introduced a white space after the comma. The correct string to use in the configuration file is given in the output of the TLS toolkit when executing the command.

Once I’ve updated this file on each node, I’m now ready to start each node of the cluster.

./bin/nifi.sh start && tail -f ./logs/nifi-app.log

Once all nodes are correctly started, I am now able to access the NiFi UI using any of the nodes in the cluster:

NiFi UI / 3-nodes secured cluster
NiFi UI / 3-nodes secured cluster

That’s all! Next post will use the current environment as a starting point to demonstrate how to scale up/down a NiFi cluster.

Screen Shot 2016-08-13 at 8.18.54 PM

Apache NiFi 1.0.0 – Cluster setup

As you may know a version 1.0.0-BETA of Apache NiFi has been released few days ago. The upcoming 1.0.0 release will be a great moment for the community as it it will mark a lot of work over the last few months with many new features being added.

The objective of the Beta release is to give people a chance to try this new version and to give a feedback before the official major release which will come shortly. If you want to preview this new version with a completely new look, you can download the binaries here, unzip it, and run it (‘./bin/nifi.sh start‘ or ‘./bin/run-nifi.bat‘ for Windows), then you just have to access http://localhost:8080/nifi/.

The objective of this post is to briefly explain how to setup an unsecured NiFi cluster with this new version (a post for setting up a secured cluster will come shortly with explanations on how to use a new tool that will be shipped with NiFi to ease the installation of a secured cluster).

One really important change with this new version is the new paradigm around cluster installation. From the NiFi documentation, we can read:

Starting with the NiFi 1.0 release, NiFi employs a Zero-Master Clustering paradigm. Each of the nodes in a NiFi cluster performs the same tasks on the data but each operates on a different set of data. Apache ZooKeeper elects one of the nodes as the Cluster Coordinator, and failover is handled automatically by ZooKeeper. All cluster nodes report heartbeat and status information to the Cluster Coordinator. The Cluster Coordinator is responsible for disconnecting and connecting nodes. As a DataFlow manager, you can interact with the NiFi cluster through the UI of any node in the cluster. Any change you make is replicated to all nodes in the cluster, allowing for multiple entry points to the cluster.

zero-master-cluster

OK, let’s start with the installation. As you may know it is greatly recommended to use an odd number of ZooKeeper instances with at least 3 nodes (to maintain a majority also called quorum). NiFi comes with an embedded instance of ZooKeeper, but you are free to use an existing cluster of ZooKeeper instances if you want. In this article, we will use the embedded ZooKeeper option.

I will use my computer as the first instance. I also launched two virtual machines (with a minimal Centos 7). All my 3 instances are able to communicate to each other on requested ports. On each machine, I configure my /etc/hosts file with:

192.168.1.17 node-3
192.168.56.101 node-2
192.168.56.102 node-1

I deploy the binaries file on my three instances and unzip it. I now have a NiFi directory on each one of my nodes.

The first thing is to configure the list of the ZK (ZooKeeper) instances in the configuration file ‘./conf/zookeep.properties‘. Since our three NiFi instances will run the embedded ZK instance, I just have to complete the file with the following properties:

server.1=node-1:2888:3888
server.2=node-2:2888:3888
server.3=node-3:2888:3888

Then, everything happens in the ‘./conf/nifi.properties‘. First, I specify that NiFi must run an embedded ZK instance, with the following property:

nifi.state.management.embedded.zookeeper.start=true

I also specify the ZK connect string:

nifi.zookeeper.connect.string=node-1:2181,node-2:2181,node-3:2181

As you can notice, the ./conf/zookeeper.properties file has a property named dataDir. By default, this value is set to ./state/zookeeper. If more than one NiFi node is running an embedded ZK, it is important to tell the server which one it is.

To do that, you need to create a file name myid and placing it in ZK’s data directory. The content of this file should be the index of the server as previously specify by the server.<number> property.

On node-1, I’ll do:

mkdir ./state
mkdir ./state/zookeeper
echo 1 > ./state/zookeeper/myid

The same operation needs to be done on each node (don’t forget to change the ID).

If you don’t do this, you may see the following kind of exceptions in the logs:

Caused by: java.lang.IllegalArgumentException: ./state/zookeeper/myid file is missing

Then we go to clustering properties. For this article, we are setting up an unsecured cluster, so we must keep:

nifi.cluster.protocol.is.secure=false

Then, we have the following properties:

nifi.cluster.is.node=true
nifi.cluster.node.address=node-1
nifi.cluster.node.protocol.port=9999
nifi.cluster.node.protocol.threads=10
nifi.cluster.node.event.history.size=25
nifi.cluster.node.connection.timeout=5 sec
nifi.cluster.node.read.timeout=5 sec
nifi.cluster.firewall.file=

I set the FQDN of the node I am configuring, and I choose the arbitrary 9999 port for the communication with the elected cluster coordinator. I apply the same configuration on my other nodes:

nifi.cluster.is.node=true
nifi.cluster.node.address=node-2
nifi.cluster.node.protocol.port=9999
nifi.cluster.node.protocol.threads=10
nifi.cluster.node.event.history.size=25
nifi.cluster.node.connection.timeout=5 sec
nifi.cluster.node.read.timeout=5 sec
nifi.cluster.firewall.file=

and

nifi.cluster.is.node=true
nifi.cluster.node.address=node-3
nifi.cluster.node.protocol.port=9999
nifi.cluster.node.protocol.threads=10
nifi.cluster.node.event.history.size=25
nifi.cluster.node.connection.timeout=5 sec
nifi.cluster.node.read.timeout=5 sec
nifi.cluster.firewall.file=

We have configured the exchanges between the nodes and the cluster coordinator, now let’s move to the exchanges between the nodes (to balance the data of the flows). We have the following properties:

nifi.remote.input.host=node-1
nifi.remote.input.secure=false
nifi.remote.input.socket.port=9998
nifi.remote.input.http.enabled=true
nifi.remote.input.http.transaction.ttl=30 sec

Again, I set the FQDN of the node I am configuring and I choose the arbitrary 9998 port for the Site-to-Site (S2S) exchanges between the nodes of my cluster. The same applies for all the nodes (just change the host property with the correct FQDN).

It is also important to set the FQDN for the web server property, otherwise we may get strange behaviors with all nodes identified as ‘localhost’ in the UI. Consequently, for each node, set the following property with the correct FQDN:

nifi.web.http.host=node-1

And that’s all! Easy, isn’t it?

OK, let’s start our nodes and let’s tail the logs to see what’s going on there!

./bin/nifi.sh start && tail -f ./logs/nifi-app.log

If you look at the logs, you should see that one of the node gets elected as the cluster coordinator and then you should see heartbeats created by the three nodes and sent to the cluster coordinator every 5 seconds.

You can connect to the UI using the node you want (you can have multiple users connected to different nodes, modifications will be applied on each node). Let’s go to:

http://node-2:8080/nifi

Here is what it looks like:

Screen Shot 2016-08-13 at 7.33.08 PM

As you can see in the top-left corner, there are 3 nodes in our cluster. Besides, if we go in the menu (button in the top-right corner) and select the cluster page, we have details on our three nodes:

Screen Shot 2016-08-13 at 7.35.28 PM

We see that my node-2 has been elected as cluster coordinator, and that my node-3 is my primary node. This distinction is important because some processors must run on a unique node (for data consistency) and in this case we will want it to run “On primary node” (example below).

We can display details on a specific node (“information” icon on the left):

Screen Shot 2016-08-13 at 7.35.48 PM

OK, let’s add a processor like GetTwitter. Since the flow will run on all nodes (with balanced data between the nodes), this processor must run on a unique processor if we don’t want to duplicate data. Then, in the scheduling strategy, we will choose the strategy “On primary node”. This way, we don’t duplicate data, and if the primary node changes (because my node dies or gets disconnected), we won’t loose data, the workflow will still be executed.

Screen Shot 2016-08-13 at 7.45.19 PM

Then I can connect my processor to a PutFile processor to save the tweets in JSON by setting a local directory (/tmp/twitter):

Screen Shot 2016-08-13 at 7.52.25 PM

If I run this flow, all my JSON tweets will be stored on the primary node, the data won’t be balanced. To balance the data, I need to use a RPG (Remote Process Group), the RPG will connect to the coordinator to evaluate the load of each node and balance the data over the nodes. It gives us the following flow:

Screen Shot 2016-08-13 at 8.00.26 PM

I have added an input port called “RPG”, then I have added a Remote Process Group that I connected to ‘http://node-2:8080/nifi&#8217; and I enabled transmission so that the Remote Process Group was aware of the existing input ports on my cluster. Then in the Remote Process Group configuration, I enabled the RPG input port. I then connected my GetTwitter to the Remote Process Group and selected the RPG input port. Finally, I connected my RPG input port to my PutFile processor.

When running the flow, I now have balanced data all over my nodes (I can check in the local directory ‘/tmp/twitter‘ on each node).

That’s all for this post. I hope you enjoyed it and that it will be helpful for you if setting up a NiFi cluster. All comments/remarks are very welcomed and I kindly encourage you to download Apache NiFi, to try it and to give a feedback to the community if you have any.

minifi-logo

Apache NiFi – MiNiFi is (almost) out!

This is quite a busy period for Apache NiFi community: Apache NiFi 0.7.0 is about to be released (RC2 is coming), Apache NiFi 1.0.0 will probably be ready in the next few weeks (stay tuned) and… Apache MiNiFi 0.0.1 will officially be released next week (RC vote in progress and so far so good)! This a great step for the community and I wanted to write a quick article about this new amazing tool!

Here is my step-by-step article to get hands on MiNiFi and play with it. But first of all, if you want to know more about this great subproject, have a look here: https://nifi.apache.org/minifi/index.html

  • Build MiNiFi from sources

I downloaded the sources from this link: https://dist.apache.org/repos/dist/dev/nifi/minifi-0.0.1/minifi-0.0.1-source-release.zip

And built the sources using maven: mvn clean install.

There are two convenience binaries generated as part of this process.  The
MiNiFi assembly and a MiNiFi Toolkit assembly.

  • Convert and validate MiNiFi templates

The toolkit can be used to convert a NiFi template (XML) into a MiNiFi template (YAML) or to validate a MiNiFi template.

For this process, I created a very simple template using  NiFi. This template is made of a GenerateFlowFile processor to generate flow files with no content every 5 seconds, and an AttributeToJson processor to extract attributes of the flow file and generate a JSON in the flow file content. Then it is connected to a Remote Process Group pointing to a local running NiFi instance configured to allow Site-to-Site communication.

minifi-flow

I then saved this template as a XML file (NiFi template).

If I look at the toolkit usage (in a Windows environment):

minifi-toolkit-0.0.1\bin> .\config.bat
Usage:

java org.apache.nifi.minifi.toolkit.configuration.ConfigMain <command> options

Valid commands include:
transform: Transform template xml into MiNiFi config YAML
validate: Validate config YAML

I now use the toolkit to convert my template:

.\config.bat transform MiNiFi-test.xml MiNiFi-test.yml

And I can also validate the generated configuration file:

.\config.bat validate MiNiFi-test.yml

Note: here is the working configuration I then used in MiNiFi.

  • Run MiNiFi

I now take the generated configuration and use it to replace the default one:

minifi-0.0.1\conf\config.yml

And I now start MiNiFi:

minifi-0.0.1\bin\run-minifi.bat

By looking at the logs (minifi-0.0.1\logs\minifi-app.log), we can see all the processors that are shipped with MiNiFi. Here is a list of the currently standard processors included with MiNiFi:

    org.apache.nifi.processors.standard.PostHTTP || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.RouteOnContent || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.FetchFile || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.EvaluateXPath || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.SplitContent || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.ListSFTP || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.ReplaceText || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.MergeContent || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.ConvertCharacterSet || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.PutDistributedMapCache || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.HandleHttpRequest || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.PutSyslog || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.CompressContent || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.ParseSyslog || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.GetFile || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.RouteOnAttribute || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.ModifyBytes || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.ControlRate || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.HashAttribute || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.Base64EncodeContent || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.TailFile || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.GetHTTP || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.HashContent || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.EvaluateXQuery || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.IdentifyMimeType || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.GetJMSQueue || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.ListenTCP || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.GetFTP || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.FetchDistributedMapCache || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.PutJMS || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.SplitXml || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.EvaluateRegularExpression || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.ListenSyslog || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.ScanContent || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.ConvertJSONToSQL || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.EncryptContent || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.FetchSFTP || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.attributes.UpdateAttribute || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-update-attribute-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.GetJMSTopic || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.ReplaceTextWithMapping || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.SplitJson || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.ListFile || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.TransformXml || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.EvaluateJsonPath || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.ExecuteProcess || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.MonitorActivity || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.ValidateXml || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.ExecuteSQL || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.SegmentContent || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.PutSFTP || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.ExecuteStreamCommand || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.LogAttribute || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.DistributeLoad || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.GenerateFlowFile || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.ListenHTTP || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.ListenUDP || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.PutSQL || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.PutFile || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.PutFTP || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.RouteText || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.ListenRELP || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.InvokeHTTP || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.ExtractText || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.UnpackContent || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.AttributesToJSON || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.PutEmail || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.DetectDuplicate || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.ScanAttribute || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.SplitText || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.GetSFTP || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.HandleHttpResponse || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.DuplicateFlowFile || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]
    org.apache.nifi.processors.standard.QueryDatabaseTable || org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\minifi-standard-nar-0.0.1.nar-unpacked]

As you can see, we can, with a default installation, do a lot of amazing things!!!

Once MiNiFi is started, the configured flow will be automatically started, and I am now able to see in my running NiFi instance all the files I received from the MiNiFi instance through Site-to-Site communication into the input port I configured:

nifi-with-minifi

For the purpose of the demonstration, the above screenshot shows all the processors involved in this demonstration:

  • I used the red-circled elements to generate a NiFi template, convert it into a MiNiFi template and get it running into a MiNiFi 0.0.1 instance.
  • The green-circled part is the only part really running on my NiFi instance and it is receiving all the flow files generated by the MiNiFi instance.

That’s all for this quick demonstration of this new tool. Hope you enjoyed it and that you will give it a try!

  • Conclusion

MiNiFi is a very lightweight tool (40Mo !) that can be easily deployed on a lot of remote machines/servers as a single running instance to collect and quickly process data or to be a remote relay to communicate with NiFi. Perspectives of the role of MiNiFi should be from the perspective of the agent acting immediately at, or directly adjacent to, source sensors, systems, or servers.

It is really easy to use and deploy and will, hopefully, be widely used in the Internet of Things! Besides, it comes with the top level security features coming with NiFi! A must!

sentimentPresFeature

US presidential election via Twitter using Apache NiFi, Spark, Hive and Zeppelin

This article describes a frequency and sentiment analysis based on real-time tweets streams in relation to the four main candidates in the US Presidential Election.

The main objective was to deploy and to test the available connector between Apache NiFi and Apache Spark, so I decided to implement the following use case:

At the end, I get real time analytics such as:

  • frequency of tweets along the time per candidate
  • percentage of negative, positive and neutral tweets per candidate
  • opinion trends along the time for each candidate

The article is available on Hortonworks Community Connection website. And as always, please feel free to comment and/or ask questions.

OAuth 1.0A with Apache NiFi (Twitter API example)

A lot of API are using OAuth  protocol to authorize the received requests and to check if everything is OK regarding the identity of the request sender.

OAuth is an open standard for authorization, commonly used as a way for Internet users to log into third party websites using their Microsoft, Google, Facebook, Twitter, One Network etc. accounts without exposing their password. Generally, OAuth provides to clients a “secure delegated access” to server resources on behalf of a resource owner. It specifies a process for resource owners to authorize third-party access to their server resources without sharing their credentials. Designed specifically to work with Hypertext Transfer Protocol (HTTP), OAuth essentially allows access tokens to be issued to third-party clients by an authorization server, with the approval of the resource owner. The third party then uses the access token to access the protected resources hosted by the resource server.

As a remark, there are two versions of the protocol currently used out there: 1.0A and 2.0. As far as I know, 1.0A is the most commonly used. I already faced the need to use OAuth 1.0A protocol with the Flickr API but, back then, I found a way to get my data differently.

Recently, a question was asked on the Hortonworks Community Connection regarding the use of Apache NiFi to get data from Twitter API using OAuth 1.0A protocol. So this time, I decided to have a look on this and to get the job done.

This post presents the flow I used to perform a request against Twitter API using OAuth protocol. It gives me the opportunity to use for the first time the ExecuteScript processor which allows user to execute custom scripts on the fly inside NiFi (you will find a lot of examples on this great site).

Note 1: this was the first time I used Groovy language, be nice with me!

Note 2: I didn’t test the flow on a lot of methods. Some modifications may be necessary for some cases.

OK. The objective was to request the “users/lookup” method of the Twitter API. You can read the documentation here.

I want to perform a HTTP GET on:

https://api.twitter.com/1.1/users/lookup.json?screen_name=twitterapi,twitter

So far it seems really easy to do with a simple InvokeHTTP processor. The thing is you need to identify yourself when sending the request. Here comes the OAuth protocol. The official specification for 1.0A can be read here. But in the case of the Twitter API, you have a nice documentation here.

Besides on the documentation of each method, you have an OAuth Signature Generator that can be accessed (if you have defined a Twitter App). The generator is here. It lets you play around and gives great insights on each request to debug your own implementation of OAuth protocol.

The global idea is: you have a lot of input parameters and you must follow the specifications to construct a string based on the parameters. This string will be the value associated to “Authorization” key in HTTP header properties.

Here is the list of the needed parameters. First the parameters directly linked to your request:

Then the global parameters related to OAuth:

  • Consumer key (private information of your app provided by Twitter)
  • Consumer secret (private information of your app provided by Twitter)
  • Nonce (random string, uniquely generated for each request)
  • Signature method (with Twitter it is HMAC-SHA1)
  • Timestamp (in seconds)
  • Token (private information of your app provided by Twitter)
  • Token secret (private information of your app provided by Twitter)
  • Version (in this case 1.0)

The first step is to construct the “signature base string“. For that you first need to create the “parameter string“. All is very well explained here. Once you have the signature base string, you can encode it using HMAC-SHA1 and you easily get the header property to set in your HTTP request:

Authorization: OAuth oauth_consumer_key="*******", oauth_nonce="a9ab2392e5158a4c4e029c7829164571", oauth_signature="4s4Hi5hQ%2FoLKprW7qsRlImds3Og%3D", oauth_signature_method="HMAC-SHA1", oauth_timestamp="1460453975", oauth_token="*******", oauth_version="1.0"

Let’s get into the details using Apache NiFi. Here is the flow I created:

oauthFlow

I use a GenerateFlowFile to generate an empty Flow File (FF) in order to execute my flow. Then I use an UpdateAttribute processor to add attributes to my FF. In this case, I only add the parameters related to the specific request I want to execute:

globalParam

Then I send my FF into a process group that will compute the header property to set (I will come back to this part later). Then I perform my request using the InvokeHTTP processor:

invokeHTTP

I set the method to GET, the URL to my corresponding FF attribute, the content type to text/plain and I add a custom property named Authorization with the FF attribute I created in my process group (see below). This custom property will be added as a HTTP header in the request. At the end, I use a PutFile processor to write the result of my request in a local directory.

Let’s go to the interesting part of our flow where all the magic is: the process group I named OAuth 1.0A. Here it is:

processGroup

I just use two processors. The first one is an UpdateAttribute to add all the parameters I need as attributes of my FF. the second one is an ExecuteScript processor where I put my groovy code to compute the header property.

First… the parameters:

oauthParameters

Note: I use Expression Language provided by NiFi for some attributes.

  • arguments is used to extract the argument part in my target URL. In this example: screen_name=twitterapi,twitter
  • base_url is the URL I request without any argument. In this example: https://api.twitter.com/1.1/users/lookup.json
  • For the nonce parameter I use the “UUID” method of the expression language which generated a random string and I just take to replace the ‘-‘ characters to only keep an alphanumeric string.
  • For the timestamp, I use the “now” method of the expression language and I use substring to only keep seconds.

Let’s move to the ExecuteScript part. I set the script engine to Groovy and I put my script code in the “script body” property. The full code is available at the end of the post. Let’s go through it piece by piece.

First thing, I want to trigger my code only when a FF is available:

def flowFile = session.get()
if (!flowFile) return

Then I define a method I will use for the HMAC-SHA1 encoding:

def static hmac(String data, String key) throws java.security.SignatureException
{
    String result
    try {
        // get an hmac_sha1 key from the raw key bytes
        SecretKeySpec signingKey = new SecretKeySpec(key.getBytes(), "HmacSHA1");
        // get an hmac_sha1 Mac instance and initialize with the signing key
        Mac mac = Mac.getInstance("HmacSHA1");
        mac.init(signingKey);
        // compute the hmac on input data bytes
        byte[] rawHmac = mac.doFinal(data.getBytes());
        result= rawHmac.encodeBase64()
    } catch (Exception e) {
        throw new SignatureException("Failed to generate HMAC : " + e.getMessage());
    }
    return result
}

For this part, I will need to add some imports at the beginning of my script body:

import java.security.SignatureException
import javax.crypto.Mac
import javax.crypto.spec.SecretKeySpec

Then I retrieve all the attributes of my FF and I extract some attributes I don’t need to construct my parameter string:

def attributes = flowFile.getAttributes()
// retrieve arguments of the target and split arguments
def arguments = attributes.arguments.tokenize('&')
def method = attributes.method
def base_url = attributes.base_url
def consumerSecret = attributes.oauth_consumer_secret
def tokenSecret = attributes.oauth_token_secret

Then I create a TreeMap in which I add all the parameters I need to construct my parameter string. A TreeMap ensures me that it is sorted on keys in alphabetical order.

TreeMap map = [:]

for (String item : arguments) {
        def (key, value) = item.tokenize('=')
        map.put(key, value)
}

map.put("oauth_consumer_key", attributes.oauth_consumer_key)
map.put("oauth_nonce", attributes.oauth_nonce)
map.put("oauth_signature_method", attributes.oauth_signature_method)
map.put("oauth_timestamp", attributes.oauth_timestamp)
map.put("oauth_token", attributes.oauth_token)
map.put("oauth_version", attributes.oauth_version)

Then I add a method to the String class to allow percent encoding on String objects:

String.metaClass.encode = {
    java.net.URLEncoder.encode(delegate, "UTF-8").replace("+", "%20").replace("*", "%2A").replace("%7E", "~");
}

I am now able to construct the parameter string:

String parameterString = ""

map.each { key, value ->
    parameterString += key.encode()
    parameterString += '='
    parameterString += value.encode()
    parameterString += '&'
}

parameterString = parameterString.substring(0, parameterString.length()-1);

Update: the code above can be simplified as below (see Andy’s comment)

String parameterString = map.collect { String key, String value ->
    "${key.encode()}=${value.encode()}"
}.join("&")

It is now possible to get the signature:

String signatureBaseString = ""
signatureBaseString += method.toUpperCase()
signatureBaseString += '&'
signatureBaseString += base_url.encode()
signatureBaseString += '&'
signatureBaseString += parameterString.encode()

String signingKey = consumerSecret.encode() + '&' + tokenSecret.encode()
String oauthSignature = hmac(signatureBaseString, signingKey)

I may add this information as a new attribute of my FF:

flowFile = session.putAttribute(flowFile, 'oauth_signature', oauthSignature)

Then I can construct the header property value to associate to Authorization key:

String oauth = 'OAuth '
oauth += 'oauth_consumer_key="'
oauth += attributes.oauth_consumer_key.encode()
oauth += '", '
oauth += 'oauth_nonce="'
oauth += attributes.oauth_nonce.encode()
oauth += '", '
oauth += 'oauth_signature="'
oauth += oauthSignature.encode()
oauth += '", '
oauth += 'oauth_signature_method="'
oauth += attributes.oauth_signature_method.encode()
oauth += '", '
oauth += 'oauth_timestamp="'
oauth += attributes.oauth_timestamp.encode()
oauth += '", '
oauth += 'oauth_token="'
oauth += attributes.oauth_token.encode()
oauth += '", '
oauth += 'oauth_version="'
oauth += attributes.oauth_version.encode()
oauth += '"'

I add this information as an attribute (that will be used in the InvokeHTTP processor as we saw before) and I forward my FF to the success relationship:

flowFile = session.putAttribute(flowFile, 'oauth_header', oauth)
session.transfer(flowFile, REL_SUCCESS)

That’s it: I have an operational flow implementing OAuth 1.0A protocol to request against the Twitter API.

The full code is available here as a gist.
The NiFi template is available here.

As always, feel free to ask questions and comment this post!

 

URL shortener service with Apache NiFi

This blog will demonstrate a new use case using Apache NiFi: implement a URL shortener service. Let’s be clear right now, I don’t think Apache NiFi is the best option to propose such a service (this is not the idea behind this Apache project) but I believe this is an opportunity to play around with some processors/functionalities I never discussed so far.

Why this idea? Some months ago, for a job interview, I have been asked to develop a URL shortener service in Go using Docker and Redis. This is available on Github here. Since this is something we can do with Apache NiFi, it is interesting to see how this can be achieved.

Before talking about Apache NiFi (if you don’t know about this project, have a look on my previous posts, this is a great tool!), let’s discuss what we want to achieve…

  • URL shortener service requirements

I want to expose a web service that gives me the opportunity to shorten long URLs in order to be able to share/remember it easily. I also want to store statistics about the number of times my shortened URL has been used. And, in our use case, I want my shortened URL to be valid at least 24h.

  • Example

I want to shorten this URL:

https://pierrevillard.com/2016/04/04/analyze-flickr-account-using-apache/

Let’s say my service is running on my local computer, I am going to access it with my browser:

localhost/shortlink?url=https://pierrevillard.com/2016/04/04/analyze-flickr-account-using-apache/

I will get a web page displaying JSON data:

{"key":"6974","url":"https://pierrevillard.com/2016/04/04/analyze-flickr-account-using-apache/","date":1460238097015,"count":0}

From now on, I will be able to access my URL by using:

localhost/6974

And I will be able to access the statistics (JSON data) of my shortened URL at:

localhost/admin?key=6974

  • Implementation with Apache NiFi

This use case gives me the opportunity to discuss about some nice features of Apache NiFi:

  1. The possibility to expose web services with the use of HandleHttpRequest and HandleHttpResponse processors in combination with a StandardHttpContextMap controller service.The controller service provides the ability to store and retrieve HTTP requests and responses external to a Processor, so that multiple processors can interact with the same HTTP request. In other words, the HandleHttpRequest processor initializes a Jetty web server listening for requests on a given port. Once a request is received, a FlowFile is generated with attributes and content (if any). This request has been received asynchronously so that the FlowFile can be used along a data flow before generating the response to send back to the user using the HandleHttpResponse processor.

    Note: at this moment, for a given listening port, there can be only one instance of the HandleHttpRequest processor on the canvas. Handling different services with the same processor can be performed with the addition of a RouteOnAttribute processor (we’ll see that in this implementation).

  2. The possibility to store information across the NiFi cluster (but also to have a distributed map cache of key/value data) using PutDistributedMapCache and FetchDistributedMapCache processors in combination with a DistributedMapCacheServer and DistributedMapCacheClientService controller services.The map cache server controller service provides a map (key/value) cache that can be accessed over a socket. Interaction with this service is typically accomplished via a DistributedMapCacheClient service. This feature is mainly used to share information across a NiFi cluster but can also be used at local level to store data in memory to be used along the flow.

    It has to be noted that, when using the PutDistributedMapCache processor, the key is given through the processor properties and the value is the content of the incoming FlowFile.

Let’s now describe the data flow I have created for the URL shortener service. First I have a single HandleHttpRequest processor with default configuration and a default StandardHttpContextMap controller service.

Then I use a RouteOnAttribute processor to define which URL has been accessed and to route the FlowFile (FF) accordingly. In the incoming FF, the attribute ‘http.request.uri’ contains the requested URL.

routeOnAttribute

At this point, I have three routes for my FF: one for the /shortlink requests, one for the /admin requests and one for the others.

Let’s start with the ‘admin’ flow part. When I receive a request at:

/admin?key=<key>

My incoming FF will have an attribute ‘http.query.param.key’ with the value <key>. This is useful to retrieve all the arguments passed along the URL.

I take the decision to use the map cache server as follow: the key will be the key value of my JSON data, and the value will be the JSON string itself. Consequently, I use a FetchDistributedMapCache processor to retrieve the JSON data associated to the given key.

fetchdistributedmapcache

If I don’t find anything, I use a ReplaceText processor to set an arbitrary error message as the content of my FF and then a HandleHttpResponse processor with the 500 HTTP error code. This way, the user will see the error message:

No URL found for key=6666

If I find an entry in my cache server, then the FF content is now filled in with my JSON string and I just need to route my FF to a HandleHttpResponse processor with a 200 HTTP code. This way, the user will see the JSON string with related information.

Let’s continue with the “shortlink” part of the flow. First of all I use a RouteOnAttribute processor to check that the URL provided at:

/shortlink?url=<URL>

is valid given a regular expression. If not I display an error message to the user with the combination of ReplaceText processor and HandleHttpResponse processor (as explained above).

If the URL is valid, I want to generate a key associated to this URL.

Note: for this part, I made the choice to keep it really simple and there are a lot of possible improvements/optimizations.

I make the decision that the key will be a 4-digits number and I create this number using time manipulation with the expression language. With an UpdateAttribute processor, I generate a ‘key’ attribute:

keyGeneration

Once the key is generated I use a FetchDistributedMapCache processor to check if this key is already used or not.

If yes, my FF now contains the associated JSON string. I use an EvaluateJsonPath processor to extract the creation date information from the JSON string and then I use a RouteOnAttribute processor to check if this creation date if below the threshold of 24 hours. If yes, I route my FF back to the UpdateAttribute processor to generate a new random key, if no, it means the key can be overwritten and I route my FF to the next steps.

Once I have a generated key that is free to use, I use a ReplaceText processor to construct my JSON string:

JSONconstruction

Then I store this information in the cache using a PutDistributedMapCache processor:

putdistributedmapcache

Note the “replace if present” property in case we are overwriting an already existing key that is too old. Then I just route my FF to a HandleHttpResponse processor with a 200 HTTP code to display to the user the JSON string.

As I said, this part is simple, there are a lot of possible improvements such as (but not limited to):

  • Have a text-based key with the possibility for the user to customize it in order to expand the number of possible shortened URL stored.
  • Use the cache server to store a sequence ID for the generated key in order to avoid randomness and possible loops in the flow.
  • Add the possibility to reuse the same key for two identical URLs to shorten.

Finally, let’s complete our use case with the last part of the flow: when a HTTP request is received which is not shortlink/admin.

The accessed URL is obtained using the FF attributes, and I can directly use a FetchDistributedMapCache processor:

fetchforredirection

If I don’t find any entry in my cache, I route the FF to a combination of ReplaceText and HandleHttpResponse processors to display an error message to the user with a 404 HTTP error code.

If I find a match, I use an EvaluateJsonPath processor to extract the counter value and the long URL from the JSON string retrieved in the cache. Then I first route my FF to a HandleHttpResponse processor with a 307 HTTP code (temporary redirection) and I add a HTTP header property to redirect the user to the corresponding URL:

httpresponse

Note: I use the 307 HTTP code to avoid my browser to cache the redirection and to perform the request each time I access my shortened URL.

I also route my FF to a ReplaceText processor in order to increment the count value in my JSON string and I use a PutDistributedMapCache processor to update the data in the cache.

That’s it! We now have a running URL shortener service with Apache NiFi. The flow is available as a template here. As always, feel free to comment and/or ask questions about this post.