Monitoring NiFi – Workflow SLA

Note – This article is part of a series discussing subjects around NiFi monitoring.

Depending of the workflows and use cases, you may want to retrieve some metrics per use-case/workflow instead of high level metrics. One of the things you could be looking after is Workflow SLA.

First of all, we need to agree on what is “Workflow SLA”. In this article, by SLA (Service-Level Agreement) monitoring I mean that I want to ensure that a workflow is processing every single event quickly enough. In other words, if an event took a long time to be processed, I want to be aware of that so I can understand what is going on. A workflow could be perfectly running in NiFi (no bulletin generated, no error) but the processing time could get bigger because of external causes (resources exhaustion, network bandwidth with external systems, abnormal event size, etc).

Luckily for us, every single flow file within NiFi contains two core attributes that are perfectly designed for our needs. From the documentation:

  • entryDate: The date and time at which the FlowFile entered the system (i.e., was created). The value of this attribute is a number that represents the number of milliseconds since midnight, Jan. 1, 1970 (UTC).
  • lineageStartDate: Any time that a FlowFile is cloned, merged, or split, this results in a “child” FlowFile being created. As those children are then cloned, merged, or split, a chain of ancestors is built. This value represents the date and time at which the oldest ancestor entered the system. Another way to think about this is that this attribute represents the latency of the FlowFile through the system. The value is a number that represents the number of milliseconds since midnight, Jan. 1, 1970 (UTC).

In our case we are really interested by the “lineageStartDate” attribute but based on your specific use cases and needs you could also be interested by “entryDate”.

At the end of my workflow I want to monitor, I could use a RouteOnAttribute processor to route the event if and only if the duration since the lineage start date is over a given threshold (let’s say one minute for the below example). And if an event took longer than expected, I’m routing the flow file to a PutEmail processor to send an email and receive a notification.

Screen Shot 2017-05-10 at 3.33.34 PM.png

My RouteOnAttribute configuration:

Screen Shot 2017-05-10 at 3.34.02 PM.png

This way, at a workflow level, you are able to check if the processing time of an event is meeting your requirements. You could obviously follow the same approach to check the processing time at multiple critical points of the workflow.

Note that, instead of sending an email notification, you could perfectly use any other kind of processor to integrate this alert/event with any system you are using on your side.

One last comment, I recommend you to check out the article about the Ambari Reporting Task and Grafana where I’m also discussing Workflow SLA and how to send the metrics to Ambari Metrics System and display the information into a Grafana dashboard.

As usual feel free to ask questions and comment this post.

Monitoring NiFi – Site2Site reporting tasks

Note – This article is part of a series discussing subjects around NiFi monitoring.

If we look at the development documentation about reporting tasks:

So far, we have mentioned little about how to convey to the outside world how NiFi and its components are performing. Is the system able to keep up with the incoming data rate? How much more can the system handle? How much data is processed at the peak time of day versus the least busy time of day?

In order to answer these questions, and many more, NiFi provides a capability for reporting status, statistics, metrics, and monitoring information to external services by means of the ReportingTask interface. ReportingTasks are given access to a host of information to determine how the system is performing.

Out of the box, you have quite a lot of available reporting tasks and, in this article, we are going to focus on few of them (have a look at the other articles to find more about the other reporting tasks).

Before going into the details, I am going to discuss Site To Site related reporting tasks and we need to understand what is Site To Site (S2S):

When sending data from one instance of NiFi to another, there are many different protocols that can be used. The preferred protocol, though, is the NiFi Site-to-Site Protocol. Site-to-Site makes it easy to securely and efficiently transfer data to/from nodes in one NiFi instance or data producing application to nodes in another NiFi instance or other consuming application.

In this case, we are going to use S2S reporting tasks, it means that the reporting tasks are continually running to collect data and to send this data to a remote NiFi instance, but you can use S2S to send data to the instance running the reporting task as well. This way, by using an input port on the canvas, you can actually receive the data generated by the reporting task and use it in a NiFi workflow. That’s really powerful because, now, you can use all the NiFi capabilities to process this data and do whatever you want with it.

Let’s go over some examples!

  • Monitoring bulletins

With the new version of Apache NiFi (1.2.0), you can transform every bulletin into a flow file sent to any NiFi instance using Site-To-Site. This way, as soon as a processor / controller service / reporting task is generating a bulletin this can be converted into a flow file that you can use to feed any system you want.

Let’s configure this reporting task to send the bulletins (as flow files) to an input port called “bulletinMonitoring” and use the flow files to send emails.

First, since my NiFi cluster is secured, I create a StandardSSLContextService in the Controller Services tab of the Controller Settings menu (this way, it can be used by reporting tasks).

Screen Shot 2017-05-10 at 12.28.54 PM.png

Screen Shot 2017-05-10 at 12.28.46 PM.png

Then, I can define my reporting task by adding a new SiteToSiteBulletinReportingTask:

Screen Shot 2017-05-10 at 12.30.55 PM.png

Screen Shot 2017-05-10 at 12.31.29 PM.png

Before starting the task, on my canvas, I have the following:

Screen Shot 2017-05-10 at 12.32.55 PM.png

Note – in a secured environment, you need to set the correct permissions on the components. You need to allow NiFi nodes to receive data via site-to-site on the input port and you also need to grant the correct permissions on the root process group so the nodes are able to see the component, view and modify the data.

I configured my PutEmail processor to send emails using the Gmail SMTP server:

Screen Shot 2017-05-10 at 12.45.13 PM.png

Now, as soon as bulletins are generated I’ll receive a notification by email containing my message with the attributes of the flow file, and there will the bulletins as attachment of my email.

Obviously, instead of sending emails, you could, for example, use some other processors to automatically open tickets in your ticketing system (like JIRA using REST API).

  • Monitoring disk space

Now, using the task we previously set, we can take advantage of the task monitoring disk space. This reporting task will generate warn logs (in the NiFi log file) and bulletins when the disk partition to monitor is used over a custom threshold. In case I want to monitor the Content Repository, I could configure my reporting task as below:

Screen Shot 2017-05-10 at 1.28.24 PM.png

Screen Shot 2017-05-10 at 1.28.34 PM.png

Using the combination of this Reporting Task and the SiteToSiteBulletinReportingTask, I’m able to generate flow files when the disk utilization is reaching a critical point and to receive notifications using all the processors I want.

  • Monitoring memory use

The same approach can be used to monitor the memory utilization within the NiFi JVM using the MonitorMemory reporting task. Have a look at the documentation of this reporting task here.

  • Monitoring back pressure on connections

There is also the SiteToSiteStatusReportingTask that will send details about the NiFi Controller status over Site-to-Site. This can be particularly useful to be notified when some processors are stopped, queues are full (and back pressure is enabled), or to build reports regarding NiFi performances. This reporting task will slightly be improved regarding back pressure (with NIFI-3791). In the meantime, if you want to receive notifications when back pressure is enabled on any connection, here is what you can do (assuming you know the back pressure thresholds):

Screen Shot 2017-05-10 at 2.07.44 PM.png

Screen Shot 2017-05-10 at 2.09.14 PM.png

Note that I configured the task to only send data about the connections but you can receive information for any kind of component.

And I use the following workflow: my input port to receive the flow files with the controller status (containing an array of JSON elements for all of my connections), then I split my array using SplitJson processor, then I use EvaluateJsonPath to extract as attributes the values queuedCount and queuedBytes and then I use a RouteOnAttribute processor to check if one of the two attributes I have is greater or equal than my thresholds, and if that’s the case I send the notification by email.

Screen Shot 2017-05-10 at 2.31.23 PM.png

My RouteOnAttribute configuration:

Screen Shot 2017-05-10 at 3.33.46 PM

Site to site reporting tasks are really useful and there are many ways to use the data they can send for monitoring purpose.

  • SiteToSiteProvenanceReportingTask

Note that you have also a Site2Site reporting task to send all the provenance events over S2S. This can be really useful if you want to send this information to external system.

While monitoring a dataflow, users often need a way to determine what happened to a particular data object (FlowFile). NiFi’s Data Provenance page provides that information. Because NiFi records and indexes data provenance details as objects flow through the system, users may perform searches, conduct troubleshooting and evaluate things like dataflow compliance and optimization in real time. By default, NiFi updates this information every five minutes, but that is configurable.

Besides, with NIFI-3859, this could also be used in a monitoring approach to only look for specific events according to custom parameters. It could be used, for instance, to check how long it takes for an event to go through NiFi and raise alerts in case an event took an unusual duration to be processed (have a look to the next article to see how this can be done differently).

As usual feel free to ask questions and comment this post.

Monitoring NiFi – Logback configuration

Note – This article is part of a series discussing subjects around NiFi monitoring.

There is one configuration file in NiFi that manages all the logging operations performed by NiFi: this file is in the configuration directory (NIFI_HOME/conf) and is named logback.xml. It is good to know that this file can be modified “on-the-fly” and it won’t be required to restart NiFi for the modifications to be taken into account.

This file is a common configuration file for the logback library which is a successor of the famous log4j project. Here is the official website. I won’t get into the details of logback itself (documentation is here) but here is a quick overview of the default configuration when using NiFi.

The most important log file in NiFi is certainly nifi-app.log which is created according to this configuration block:

    <appender name="APP_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <file>${org.apache.nifi.bootstrap.config.log.dir}/nifi-app.log</file>
        <rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
            <fileNamePattern>${org.apache.nifi.bootstrap.config.log.dir}/nifi-app_%d{yyyy-MM-dd_HH}.%i.log</fileNamePattern>
            <maxFileSize>100MB</maxFileSize>
            <maxHistory>30</maxHistory>
        </rollingPolicy>
        <immediateFlush>true</immediateFlush>
        <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
            <pattern>%date %level [%thread] %logger{40} %msg%n</pattern>
        </encoder>
    </appender>

    <root level="INFO">
        <appender-ref ref="APP_FILE"/>
    </root>

In this case, log file is rolling every hour or when the file is going over 100MB. Besides we only keep up to 30 files worth of history. Based on your retention policies you can update this file to meet your requirements.

Also note that, by default, you will have the following log files:

  • ./logs/nifi-app.log
  • ./logs/nifi-bootstrap.log
  • ./logs/nifi-user.log

Now… what can we do to use this file for monitoring purpose? Some options here:

  • We can use the TailFile processor of NiFi to let NiFi tails its own log file and perform the required operation when some conditions are met. Example to send an HTTP request when logs of WARN and ERROR levels are detected:

Screen Shot 2017-05-02 at 9.05.14 PM.png

  • We can also define new appenders in the log configuration file and change it according to our needs. In particular, we could be interested by the SMTP Appender that can send logs via emails based on quite a large set of conditions. Full documentation here.
  • Obviously you can also configure this configuration file so that NiFi log files integrate with your existing systems. An idea could be to configure a Syslog appender to also redirect the logs to an external system.

Few remarks regarding the logs as I see recurrent questions:

  • At the moment, there is no option to have a log file per processor or per process group. It is discussed by NIFI-3065. However such an approach could have performance implications since each log message would need to be routed to the correct log file.
  • Also, at the moment, the custom name of the processor is not displayed in the log messages (only the type is used). It is discussed by NIFI-3877. Right now it looks like:

2017-05-12 10:43:37,780 INFO [Timer-Driven Process Thread-4] o.a.nifi.processors.standard.InvokeHTTP InvokeHTTP[id=f7ebb153-015b-1000-f590-599786e16340] my log message…

This could be a solution in a multi-tenant environment (to get one log file per workflow/use case) assuming each “team” is following the same convention to name the components in the workflows.

As usual feel free to ask questions and comment this post.

Monitoring NiFi – Bootstrap notifier

Note – This article is part of a series discussing subjects around NiFi monitoring.

When NiFi is running, there is more than one process running on the server. Let’s have a look:

[root@pvillard-hdf-2 ~]# ps -ef | grep nifi
nifi     19380     1  0 12:33 ?        00:00:00 /bin/sh /usr/hdf/current/nifi/bin/nifi.sh start
nifi     19382 19380  0 12:33 ?        00:00:06 /usr/java/jdk1.8.0_131//bin/java -cp /usr/hdf/current/nifi/conf:/usr/hdf/current/nifi/lib/bootstrap/* -Xms12m -Xmx24m -Dorg.apache.nifi.bootstrap.config.log.dir=/var/log/nifi -Dorg.apache.nifi.bootstrap.config.pid.dir=/var/run/nifi -Dorg.apache.nifi.bootstrap.config.file=/usr/hdf/current/nifi/conf/bootstrap.conf org.apache.nifi.bootstrap.RunNiFi start
nifi     19419 19382  7 12:33 ?        00:07:36 /usr/java/jdk1.8.0_131/bin/java -classpath /usr/hdf/current/nifi/conf:/usr/hdf/current/nifi/lib/jcl-over-slf4j-1.7.12.jar:/usr/hdf/current/nifi/lib/jul-to-slf4j-1.7.12.jar:/usr/hdf/current/nifi/lib/log4j-over-slf4j-1.7.12.jar:/usr/hdf/current/nifi/lib/logback-classic-1.1.3.jar:/usr/hdf/current/nifi/lib/logback-core-1.1.3.jar:/usr/hdf/current/nifi/lib/nifi-runtime-1.1.0.2.1.2.0-10.jar:/usr/hdf/current/nifi/lib/nifi-api-1.1.0.2.1.2.0-10.jar:/usr/hdf/current/nifi/lib/nifi-documentation-1.1.0.2.1.2.0-10.jar:/usr/hdf/current/nifi/lib/nifi-framework-api-1.1.0.2.1.2.0-10.jar:/usr/hdf/current/nifi/lib/nifi-nar-utils-1.1.0.2.1.2.0-10.jar:/usr/hdf/current/nifi/lib/slf4j-api-1.7.12.jar:/usr/hdf/current/nifi/lib/nifi-properties-1.1.0.2.1.2.0-10.jar -Dorg.apache.jasper.compiler.disablejsr199=true -Xmx512m -Xms512m -Dambari.application.id=nifi -Dambari.metrics.collector.url=http://pvillard-hdf-1:6188/ws/v1/timeline/metrics -Dsun.net.http.allowRestrictedHeaders=true -Djava.net.preferIPv4Stack=true -Djava.awt.headless=true -XX:+UseG1GC -Djava.protocol.handler.pkgs=sun.net.www.protocol -Dnifi.properties.file.path=/usr/hdf/current/nifi/conf/nifi.properties -Dnifi.bootstrap.listen.port=39585 -Dapp=NiFi -Dorg.apache.nifi.bootstrap.config.log.dir=/var/log/nifi org.apache.nifi.NiFi -k A2EA52795B33AB2F21C93E7E820D08369F1448478C877F4C710D6E85FD904AE6

As you can see, the script is running a bootstrap (running a JVM between 12 and 24MB) that is in charge of the NiFi JVM itself. In this example, the script is running with the PID 19380 and is the parent of the bootstrap process running with PID 19382. The bootstrap itself is the parent of NiFi running with the PID 19419.

If you only kill the NiFi process, the bootstrap will detect it and automatically relaunch NiFi for you:

kill -9 19419

Then I have:

nifi     19380     1  0 12:33 ?        00:00:00 /bin/sh /usr/hdf/current/nifi/bin/nifi.sh start
nifi     19382 19380  0 12:33 ?        00:00:06 /usr/java/jdk1.8.0_131//bin/java -cp /usr/hdf/current/nifi/conf:/usr/hdf/current/nifi/lib/bootstrap/* -Xms12m -Xmx24m -Dorg.apache.nifi.bootstrap.config.log.dir=/var/log/nifi -Dorg.apache.nifi.bootstrap.config.pid.dir=/var/run/nifi -Dorg.apache.nifi.bootstrap.config.file=/usr/hdf/current/nifi/conf/bootstrap.conf org.apache.nifi.bootstrap.RunNiFi start
nifi     24702 19382 99 14:20 ?        00:00:05 /usr/java/jdk1.8.0_131/bin/java -classpath /usr/hdf/current/nifi/conf:/usr/hdf/current/nifi/lib/jcl-over-slf4j-1.7.12.jar:/usr/hdf/current/nifi/lib/jul-to-slf4j-1.7.12.jar:/usr/hdf/current/nifi/lib/log4j-over-slf4j-1.7.12.jar:/usr/hdf/current/nifi/lib/logback-classic-1.1.3.jar:/usr/hdf/current/nifi/lib/logback-core-1.1.3.jar:/usr/hdf/current/nifi/lib/nifi-runtime-1.1.0.2.1.2.0-10.jar:/usr/hdf/current/nifi/lib/nifi-api-1.1.0.2.1.2.0-10.jar:/usr/hdf/current/nifi/lib/nifi-documentation-1.1.0.2.1.2.0-10.jar:/usr/hdf/current/nifi/lib/nifi-framework-api-1.1.0.2.1.2.0-10.jar:/usr/hdf/current/nifi/lib/nifi-nar-utils-1.1.0.2.1.2.0-10.jar:/usr/hdf/current/nifi/lib/slf4j-api-1.7.12.jar:/usr/hdf/current/nifi/lib/nifi-properties-1.1.0.2.1.2.0-10.jar -Dorg.apache.jasper.compiler.disablejsr199=true -Xmx512m -Xms512m -Dambari.application.id=nifi -Dambari.metrics.collector.url=http://pvillard-hdf-1:6188/ws/v1/timeline/metrics -Dsun.net.http.allowRestrictedHeaders=true -Djava.net.preferIPv4Stack=true -Djava.awt.headless=true -XX:+UseG1GC -Djava.protocol.handler.pkgs=sun.net.www.protocol -Dnifi.properties.file.path=/usr/hdf/current/nifi/conf/nifi.properties -Dnifi.bootstrap.listen.port=39585 -Dapp=NiFi -Dorg.apache.nifi.bootstrap.config.log.dir=/var/log/nifi org.apache.nifi.NiFi -k A2EA52795B33AB2F21C93E7E820D08369F1448478C877F4C710D6E85FD904AE6

As you can see the NiFi process has a new PID since it is a new process launched by the bootstrap.

You can find more information about the bootstrap system in the administration guide. One interesting feature with this bootstrap approach is the bootstrap notifier. It allows you to configure a notification service that will be triggered when the bootstrap starts, stops or detects an interruption of the NiFi process.

With the new version of Apache NiFi (1.2.0), you can send notification to an HTTP(S) endpoint. If you need a custom notification service (example: send SNMP traps), it’s not that hard: you just need to extend AbstractNotificationService. You can have a look at the two existing implementations: email notifier and HTTP notifier.

Let’s configure our NiFi to use the email notification service and see what is the result. On each node of our cluster, we need to update the bootstrap.conf configuration file as below:

###
# Notification Services for notifying interested parties when NiFi is stopped, started, dies
###

# XML File that contains the definitions of the notification services
notification.services.file=./conf/bootstrap-notification-services.xml

# In the case that we are unable to send a notification for an event, how many times should we retry?
notification.max.attempts=5

# Comma-separated list of identifiers that are present in the notification.services.file; which services should be used to notify when NiFi is started?
nifi.start.notification.services=email-notification

# Comma-separated list of identifiers that are present in the notification.services.file; which services should be used to notify when NiFi is stopped?
nifi.stop.notification.services=email-notification

# Comma-separated list of identifiers that are present in the notification.services.file; which services should be used to notify when NiFi dies?
nifi.dead.notification.services=email-notification

In this case, we are saying that the configuration of our notification services (we can define and use multiple notifiers) is in the file

./conf/bootstrap-notification-services.xml

and that we want to use the notifier called “email-notification” (that’s the default name defined in the XML configuration file) for stop, start, and dead events. As stated in the documentation, it’s possible to define a list of notifiers for each type of event.

For this demonstration, I’ll use my Gmail account and the Gmail SMTP server to send the notifications (obviously, with this example, NiFi needs a public internet access to send the requests to the SMTP server). Here is the configuration file:

<services>
     <service>
        <id>email-notification</id>
        <class>org.apache.nifi.bootstrap.notification.email.EmailNotificationService</class>
        <property name="SMTP Hostname">smtp.gmail.com</property>
        <property name="SMTP Port">587</property>
        <property name="SMTP Username">my-email-address@gmail.com</property>
        <property name="SMTP Password">myPassword</property>
        <property name="SMTP TLS">true</property>
        <property name="From">"NiFi Service Notifier"</property>
        <property name="To">email address that will receive the notifications</property>
     </service>
</services>

Here are the emails I received when restarting NiFi or killing the NiFi process:

  • Start event

Title: NiFi Started on Host pvillard-hdf-2 (172.26.249.33)

Hello,

Apache NiFi has been started on host pvillard-hdf-2 (172.26.249.33) at 2017/04/27 22:15:01.376 by user nifi

  • Stop event

Title: NiFi Stopped on Host pvillard-hdf-2 (172.26.249.33)

Hello,

Apache NiFi has been told to initiate a shutdown on host pvillard-hdf-2 (172.26.249.33) at 2017/04/27 22:14:35.702 by user nifi

  • Dead event

Title: NiFi Died on Host pvillard-hdf-2 (172.26.249.33)

Hello,

It appears that Apache NiFi has died on host pvillard-hdf-2 (172.26.249.33) at 2017/04/28 09:52:01.973; automatically restarting NiFi

OK, now let’s see how to configure the HTTP notification service. Let’s say I have a web server listening here:

http://pvillard-hdf-4:9999/notification

My XML configuration is now:

<services>
     <service>
        <id>email-notification</id>
        <class>org.apache.nifi.bootstrap.notification.email.EmailNotificationService</class>
        <property name="SMTP Hostname">smtp.gmail.com</property>
        <property name="SMTP Port">587</property>
        <property name="SMTP Username">my-email-address@gmail.com</property>
        <property name="SMTP Password">myPassword</property>
        <property name="SMTP TLS">true</property>
        <property name="From">"NiFi Service Notifier"</property>
        <property name="To">email address that will receive the notifications</property>
     </service>
     <service>
        <id>http-notification</id>
        <class>org.apache.nifi.bootstrap.notification.http.HttpNotificationService</class>
        <property name="URL">http://pvillard-hdf-4:9999/notification</property>
     </service>
</services>

And my bootstrap.conf configuration file now contains:

###
# Notification Services for notifying interested parties when NiFi is stopped, started, dies
###

# XML File that contains the definitions of the notification services
notification.services.file=./conf/bootstrap-notification-services.xml

# In the case that we are unable to send a notification for an event, how many times should we retry?
notification.max.attempts=5

# Comma-separated list of identifiers that are present in the notification.services.file; which services should be used to notify when NiFi is started?
nifi.start.notification.services=email-notification,http-notification

# Comma-separated list of identifiers that are present in the notification.services.file; which services should be used to notify when NiFi is stopped?
nifi.stop.notification.services=email-notification,http-notification

# Comma-separated list of identifiers that are present in the notification.services.file; which services should be used to notify when NiFi dies?
nifi.dead.notification.services=email-notification,http-notification

I now restart NiFi, and I can confirm that I receive a notification (I used a standalone NiFi to confirm the reception of the notification):

Screen Shot 2017-05-02 at 5.15.29 PM

The content of the notification is the same as with the email notification service. Note that it’s also possible to configure properties for a keystore and a truststore to send notifications using HTTPS:

<service>
   <id>http-notification</id>
   <class>org.apache.nifi.bootstrap.notification.http.HttpNotificationService</class>

   /* The URL to send the notification to */
   <property name="URL"></property>

   /* Max wait time for connection to remote service - default is 10s */
   <property name="Connection timeout"></property>

   /* Max wait time for remote service to read the request sent - default is 10s */
   <property name="Write timeout"></property>

   /* The fully-qualified filename of the Truststore */
   <property name="Truststore Filename"></property>

   /* The Type of the Truststore. Either JKS or PKCS12 */
   <property name="Truststore Type"></property>

   /* The password for the Truststore */
   <property name="Truststore Password"></property>

   /* The fully-qualified filename of the Keystore */
   <property name="Keystore Filename"></property>

   /* The Type of the Keystore. Either JKS or PKCS12 */
   <property name="Keystore Type"></property>

   /* The password for the Keystore */
   <property name="Keystore Password"></property>

   /* The password for the key. If this is not specified, but the Keystore Filename, Password, and Type are specified, then the Keystore Password will be assumed to be the same as the Key Password */
   <property name="Key Password"></property>

   /* The algorithm to use for this SSL context. Either TLS or SSL */
   <property name="SSL Protocol"></property>
</service>

Note that I also received an email notification since I specified the two notifiers in the configuration.

Again, if you need a custom notifier, this should not be really difficult and you are more than welcome to contribute your notifier code to the Apache community. It will give more options to users when setting a bootstrap notifier.

As usual feel free to ask questions and comment this post.

Monitoring NiFi – Introduction

Apache NiFi 1.2.0 has just been released with a lot of very cool new features… and I take this opportunity to start a series of articles around monitoring. This is a recurring subject and I often hear the same questions. This series won’t provide an exhaustive list of the ways you can use to monitor NiFi (with or without HDF) but, at least, it should get you started!

Here is a quick summary of the subjects that will be covered:

For this series of article, I will use, as a demo environment, a 4-nodes HDF cluster (running on CentOS 7):

I’m using HDF to take advantage of Ambari to ease the deployment but this is not mandatory for what I’m going to discuss in the articles (except for stuff around the Ambari reporting task obviously).

I will not cover how setting up this environment but if this is something you are looking after, feel free to ask questions (here or on the Hortonworks Community Connection) and to have a look into Hortonworks documentation about HDF.

I don’t want to write a single (very) long article and for the sake of clarity there is one article per listed subject. Also, I’ll try to update the articles to stick as best as possible to latest features provided by NiFi over time.

Also, if you feel that some subjects should be added to the list, let me know and I’ll do my best to cover other monitoring-related questions.

List/Fetch pattern and Remote Process Group in Apache NiFi

Note – if you’re using NiFi 1.8+, this post is no longer up to date. It is useful to understand how NiFi works but things have changed a bit. Have a look here.

I do see a lot of questions about how is working the List[X]/Fetch[X] processors and how to load balance the data over the nodes of a NiFi cluster once the data is already in the cluster. Since the question comes up quite often, let’s discuss the subject and let’s try to understand how things are working here.

I will assume that you are running a NiFi cluster since there is no problem about data balancing with a standalone instance 😉

The first thing to understand is: when running a cluster, one of the node is randomly designated as the “Primary node”. The election takes place when the cluster starts, and there is no way to decide which node will be the primary node. OK… you could force things when your cluster starts but there is no point to do such a thing if you want real high availability. So short line is: all nodes may have to be the Primary node at one point and don’t assume that the Primary node will be a given node in particular.

Second thing to understand is: the flow that you are designing on your canvas is running on all the nodes independently. Each node of the cluster is responsible of its own data and a relationship between two processors does not mean that the data going into this relationship will be balanced over the nodes. Unless you use a Remote Process Group (see below) the data will remain on the same node from the beginning to the end of the flow.

I will use a the following example to illustrate my explanations: I want to get files from a remote SFTP server and put the files into HDFS.

  • First idea (bad idea!) / GetSFTP -> PutHDFS

Screen Shot 2017-02-23 at 11.04.42 AM.png

The first option could be the pattern Get/Put which is perfectly fine with a standalone instance. However this will cause issues if you have a NiFi cluster. Remember? The flow is running on all hosts of your cluster. Problem is that you will have concurrent accesses from your nodes to the same files on the SFTP server… and if the processor is configured to delete the file once retrieved (default behavior) you will have errors showing up. Conclusion: always have in mind that a processor is running on all the nodes and can cause concurrent access errors depending on the remote system.

  • Second idea (not efficient!) / GetSFTP on Primary Node -> PutHDFS

The second option is to configure the GetSFTP processor to only run on the Primary Node (in the Scheduling tab of the processor configuration):

Screen Shot 2017-02-23 at 11.10.12 AM.png

This way, you will solve the concurrent accesses since only one node of your cluster (the Primary node) will run the GetSFTP processor.

Brief aside: remember, the flow is running on all the nodes, however if the processor is configured to run on the primary node only, the processor won’t be scheduled on nodes not being the primary node. That’s all.

With this approach the problem is that it’s not efficient at all. First reason is that you get data from only one node (this does not scale at all), and, in the end, only the primary node of your cluster is actually handling the data. Why? Because, unless you explicitly use a remote process group, the data will remain on the same node from the beginning to the end. In this case, only the primary node will actually get data from SFTP server and push it into HDFS.

  • Recommended pattern : ListSFTP -> RPG / Input Port -> FetchSFTP -> PutHDFS

To solve the issues, the List/Fetch pattern has been developed and widely used for a lot of processors. The idea is the following: the List processor will only list the data to retrieve on the remote system and get the associated metadata (it will not get the data itself). For each listed item, a flow file (with no content) will be generated and attributes will be populated with the metadata. Then the flow file is sent to the Fetch processor to actually retrieved the data from the remote system based on the metadata (it can be the path of the file on the remote system for example). Since each flow file contains the metadata of a specific item on the remote system, you won’t have concurrent accesses even if you have multiple Fetch processors running in parallel.

Obviously the List processor is meant to be run on the Primary node only. Then you have to balance the generated flow files over the nodes so that the Fetch processor on each node is dealing with flow files. For this purpose you have to use a Remote Process Group.

A Remote Process Group is an abstract object used to connect two NiFi setup together (the communication between the two NiFi is what we call Site-to-Site or S2S). It can be a MiNiFi instance to a NiFi cluster, a NiFi cluster to another NiFi cluster, a NiFi standalone to a NiFi cluster, etc. And it can also be used to connect a NiFi cluster to itself! This way the flow files will be balanced over all the nodes of the cluster. Few things to know with a Remote Process Group:

  1. You need to have an input port on the remote instance you are connecting to (in our case, you need a remote input port on your canvas).
  2. The address you give when configuring your remote process group does not matter in terms of high availability: once the connection is established with one of the nodes of the remote instance, the remote process group will be aware of all the nodes of the remote instance and will manage the case where the node specified in the address goes down.
  3. Your instances need to be configured to allow remote access. The required properties are:

# Site to Site properties
nifi.remote.input.host=
nifi.remote.input.secure=
nifi.remote.input.socket.port=
nifi.remote.input.http.enabled=
nifi.remote.input.http.transaction.ttl=

In the case of our SFTP example, it looks like:

Screen Shot 2017-02-23 at 11.47.40 AM.png

Let’s try to understand what is going on from a cluster perspective. Here is what we have in the case of a 3-nodes NiFi cluster with ListSFTP running on the primary node only:

Screen Shot 2017-02-23 at 12.03.22 PM.png

The ListSFTP when scheduled is going to list the three files on my remote SFTP server and will generate one flow file for each remote file. Each flow file won’t have any content but will have attributes with metadata of the remote files. In the case of ListSFTP, I’ll have (check the documentation at the “Write attributes” paragraph):

Name Description
sftp.remote.host The hostname of the SFTP Server
sftp.remote.port The port that was connected to on the SFTP Server
sftp.listing.user The username of the user that performed the SFTP Listing
file.owner The numeric owner id of the source file
file.group The numeric group id of the source file
file.permissions The read/write/execute permissions of the source file
file.lastModifiedTime The timestamp of when the file in the filesystem waslast modified as ‘yyyy-MM-dd’T’HH:mm:ssZ’
filename The name of the file on the SFTP Server
path The fully qualified name of the directory on the SFTP Server from which the file was pulled

The ListSFTP processor will generate 3 flow files and, for now, all flow files are only on the primary node:

Screen Shot 2017-02-23 at 1.59.43 PM.png

Now the Remote Process Group has been configured to connect to the cluster itself, and I set the relationship going from ListSFTP to the Remote Process Group to connect with the input port I created (you may have multiple input ports in the remote system to connect with and you can choose the input port to connect to, that’s up to your needs). When the RPG (Remote Process Group) has the communication enabled, the RPG is aware of the three nodes and will balance the data to each remote node (be aware that there is a lot of parameters for Site-to-Site to improve efficiency). In my case that would give something like:

Screen Shot 2017-02-23 at 1.59.55 PM.png

Note: that would be an ideal case in terms of balancing but, for efficiency purpose, the Site-to-Site mechanism might send batch of flow files to the remote node. In the above example, with only 3 flow files, I would probably not end up with one flow file per node.

Now, since we have everything in the attributes of our flow files, we need to use the Expression Language to set the properties of the FetchSFTP processor to use the attributes of the incoming flow files:

screen-shot-2017-02-23-at-1-55-13-pm

This way, each instance of the FetchSFTP processor will take care of its own file (to actually fetch the content of the remote data) and there won’t be any concurrent access:

Screen Shot 2017-02-23 at 2.00.12 PM.png

All your nodes are retrieving data and you really can scale up your cluster depending on your requirements. Note also that the PutHDFS won’t be an issue neither since each node will write its own file.

As I said previously a lot of processors are embracing this pattern (and this is recommended way to use such processors with a NiFi cluster), and I’d strongly encourage you to do the same when developing your custom processors.

As always questions/comments are welcome.

Debugging Hadoop WebHDFS API

Last week, I found myself unable to use the WebHDFS REST API through an ETL tool. The only error message I got was:

HTTP 400 Bad Request

By looking at the following documentation, I understood that there was only two options:

  • IllegalArgumentException
  • UnsupportedOperationException

Not really helping so I decided to make the API calls myself with curl requests… and here it was with a simple query to list files at the root folder of HDFS as user “123”:

$ curl -i -X PUT -T test.txt "http://mynode:50075/webhdfs/v1/?op=LISTSTATUS&user.name=123"
HTTP/1.1 400 Bad Request
Content-Type: application/json; charset=utf-8
Content-Length: 209
Connection: close

{"RemoteException":{"exception":"IllegalArgumentException","javaClassName":"java.lang.IllegalArgumentException","message":"Invalid value: \"123\" does not belong to the domain ^[A-Za-z_][A-Za-z0-9._-]*[$]?$"}}

Conclusion is: the user name used with the query is checked against a regular expression and, if not validated, the above exception is returned. The default regular expression being:

^[A-Za-z_][A-Za-z0-9._-]*[$]?$

I was unable to use WebHDFS because my username was starting by numbers… I understand that this is kind of an edge case: it is rare to have such format in user names but still… I couldn’t believe to be blocked because of this.

After a quick search, I found HDFS-4983 that exposed a property in the HDFS configuration file (from Apache Hadoop 2.3.0) to change the default regular expression. Great! I changed the property (dfs.webhdfs.user.provider.user.pattern), restarted my HDFS service and tested my curl request. The request was successful so I restarted my ETL workflow… and… got the same error! Kind of unexpected…

Back to the basics: making curl requests… My ETL workflow was just trying to load a file into HDFS… So let’s do that manually:

$ curl -i -X PUT "http://mynode:50070/webhdfs/v1/tmp/test.txt?op=CREATE&user.name=123"
HTTP/1.1 307 TEMPORARY_REDIRECT
Cache-Control: no-cache
Expires: Sat, 04 Feb 2017 10:19:38 GMT
Date: Sat, 04 Feb 2017 10:19:38 GMT
Pragma: no-cache
Expires: Sat, 04 Feb 2017 10:19:38 GMT
Date: Sat, 04 Feb 2017 10:19:38 GMT
Pragma: no-cache
X-FRAME-OPTIONS: SAMEORIGIN
Set-Cookie: hadoop.auth="u=123&p=123&t=simple&e=1486239578624&s=UrzCjP0SPpPKDJnSYB5BsKuQVKc="; Path=/; HttpOnly
Location: http://mynode:50075/webhdfs/v1/tmp/test.txt?op=CREATE&user.name=123&namenoderpcaddress=mynode:8020&createflag=&createparent=true&overwrite=false
Content-Type: application/octet-stream
Content-Length: 0

$ curl -i -X PUT -T test.txt "http://mynode:50075/webhdfs/v1/tmp/test.txt?op=CREATE&user.name=123&namenoderpcaddress=mynode:8020&createflag=&createparent=true&overwrite=false"
HTTP/1.1 400 Bad Request
Content-Type: application/json; charset=utf-8
Content-Length: 209
Connection: close

{"RemoteException":{"exception":"IllegalArgumentException","javaClassName":"java.lang.IllegalArgumentException","message":"Invalid value: \"123\" does not belong to the domain ^[A-Za-z_][A-Za-z0-9._-]*[$]?$"}}

As explained here, the first step is to make a request against the Name Node to get the address of a Data Node where the file data is to be written. Then make the second request directly to the Data Node to actually send the data (in this case it’s my test.txt file). And… here is the error again!

So it seems that HDFS-4983 is only fixing read access against the Name Node web interface but not when calling the web interface running with a Data Node… I checked out the code (be careful to check out the code corresponding to the exact version you are running otherwise attaching a debugger won’t be helpful, in my case I got the sources from Hortonworks repositories), and imported it in my Eclipse IDE. A quick search in the code to find occurrences of the regular expression lead me to UserParam class where I found a static import:

import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_WEBHDFS_USER_PATTERN_DEFAULT;

And:

private static Domain domain = new Domain(NAME, Pattern.compile(DFS_WEBHDFS_USER_PATTERN_DEFAULT));

Meaning that if this domain attribute is not overridden somewhere by a call, then the default regular expression will be used.  It really looks like what we are experiencing!

Just to confirm my assumption, I modified the bootstrap of the Data Node JVM to attach a debugger by adding the following parameters when the JVM is launched:

-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=9999

This way, in Eclipse, I can launch a remote debugger pointing to my Data Node on the port 9999 (ensure this port is opened and available) and define all the breakpoints I want. In this case I set a break point in the channelRead0 method of the WebHdfsHandler class (which is the entry point of the HTTP listener running in the Data Node). This way I confirmed that when checking Users Group Information:

ugi = ugiProvider.ugi();

The user name is checked using the default regular expression. By looking at the patch in HDFS-4983, it was easy to figure out what needed to be added in the code to have the property correctly set in the Data Node web handler. And that’s how I created HDFS-11391 and the associated pull request.

One line of code to fix this issue. Sometimes it’s not that easy. Anyway, it gave me the opportunity to contribute to the Apache Hadoop project… Sweet.

HAProxy load balancing in front of Apache NiFi

There is a lot of situations where load balancing is necessary even more so when you are in a clustered architecture. In the context of NiFi you may want to consider two situations:

  • You have a NiFi cluster and you don’t want to give users the IP address of your NiFi nodes to access the UI (remember: every node of the cluster can be used for the UI or to make REST API calls) and besides you want to load balance users on every node of the cluster. In addition, in case you scale up/down your cluster, you don’t want to inform all the users of such a change.
  • You have Listen[X] processors (HTTP, TCP, UDP, Syslog, etc) in your workflow that are not running on the primary node only and you want to have all the nodes receiving the data (to increase performances and have full scalability). Again you don’t want to configure/inform the data senders with all the IPs of your NiFi nodes, and in case of a cluster change you don’t want to make some changes on client side.

In such situations you need a load balancer that will stand in front of your NiFi cluster and will provide a VIP (Virtual IP). For this purpose, you have hardware options, software options and also cloud options. Keep in mind that, if you want a production setup, you’ll need to have your load balancer installed in a high availability fashion otherwise it’ll become a single point of failure.

In this article, I’ll use HAProxy which is the most widely used open source software based load balancing solution. And I’ll launch my HAProxy instance in a Docker container (I’ll also use Portainer as a UI helping me to manage my Docker environment).

Some useful links that can help you with the different tools I’m going to use:

I assume I already have my secured NiFi 3-nodes cluster up and running. And also, I’ve already setup my Docker and Portainer environment: I’m ready to pull HAProxy Docker image and to create my container.

Let’s start with my HAProxy instance. First of all, in Portainer, I just need to pull the image I want (in this case I want the latest version and I just need to enter haproxy):

Screen Shot 2017-02-08 at 12.23.09 PM.png

I now have the image available:

screen-shot-2017-02-08-at-12-24-14-pm

Let’s build a container. In order to ease the modification of the configuration file of HAProxy, I’ll define a bind mount to bind the configuration file inside the container to a file I have on my computer. I also need to define all the ports I want to expose in my container. Since my container will run on one of the nodes of my cluster I don’t want to use the same ports (but in practice, your HAProxy would probably be on its own host).

  • Open my container on 9443, and I’ll configure HAProxy to listen on 9443 and to redirect requests made on 9443 to my NiFi nodes on port 8443 (port of my NiFi UI)
  • Open my container on 9999, and I’ll configure HAProxy to listen on 9999 and to redirect requests made on 9999 to my NiFi nodes on port 8888 (port that I’ll use for my ListenHTTP processors)
  • Open my container on 1936 and map on port 1936 inside my container (that’s the port used by the HAProxy management UI)

In Portainer, I add a container and define the following:

Screen Shot 2017-02-08 at 2.42.02 PM.png

When I create my container, it will immediately stop because I didn’t configure my configuration file so far and the container won’t start if the configuration file is invalid or does not exist. You can check the logs of your container in Portainer in case of issue.

In our case we’ll have HTTP access to send data to our ListenHTTP processors and HTTPS to access the UI. In case of HTTPS, I don’t want to have my load balancer taking care of certificates and I just want to have my requests going through (pure TCP load balancing)… this will raise an issue to users accessing the UI because the certificate presented by the NiFi node won’t match the address the client is requesting (load balancer address), this could be solved by adding a SAN (Subject Alternative Name) in the certificate of the nodes but this will be discussed in another article (in the mean time you can have a look at NIFI-3331). In this case (and just because this is a demo!), I’ll accept the SSL exceptions in my browser.

OK so let’s see my HAProxy configuration file:

global

defaults
    log     global
    mode    http
    option  httplog
    option  dontlognull
    timeout connect 5000
    timeout client  50000
    timeout server  50000

frontend nifi-ui
    bind *:9443
    mode tcp
    default_backend nodes-ui

backend nodes-ui
    mode tcp
    balance roundrobin
    stick-table type ip size 200k expire 30m
    stick on src
    option httpchk HEAD / HTTP/1.1\r\nHost:localhost
    server nifi01 node-1:8443 check check-ssl verify none
    server nifi02 node-2:8443 check check-ssl verify none
    server nifi03 node-3:8443 check check-ssl verify none

frontend nifi-listen-http
    bind *:9999
    mode http
    default_backend nodes-http

backend nodes-http
    mode http
    balance roundrobin
    option forwardfor
    http-request set-header X-Forwarded-Port %[dst_port]
    option httpchk HEAD / HTTP/1.1\r\nHost:localhost
    server nifi01 node-1:8888 check
    server nifi02 node-2:8888 check
    server nifi03 node-3:8888 check

listen stats
    bind *:1936
    mode http
    stats enable
    stats uri /
    stats hide-version
    stats auth admin:password

Let’s see the different parts:

  • global – I put nothing in here
  • defaults – Just some timeouts and log configuration, nothing special
  • frontend nifi-ui – here I define a front end called “nifi-ui”. This is where I define on what port HAProxy should listen (in this case on port 9443, for TCP mode) and where I should redirect the requests (in this case to my back end called nodes-ui).
  • backend nodes-ui – here I define my back end where will be redirected my requests received by my front end. I define TCP mode, round robin load balancing, stickiness (to ensure that a connected user, based on its IP, will remain on the same node over multiple requests), and the nodes available. I also define the health check operation performed by HAProxy to confirm if nodes are up or down. In this case it’s a HEAD HTTP request and I don’t check the SSL certificates.
  • frontend nifi-listen-http – here I define the front end that will be used to send data to my ListenHTTP processors on port 9999.
  • backend nodes-ui – here I define the back end with my servers, the HTTP mode, the health check, and also the addition of two HTTP headers (x-forwarded-port and x-forwarded-for) to keep track of IP and port of my client (otherwise I’d only know about the IP and port of my load balancer when receiving data in NiFi).
  • listen stats – here are some parameters about the HAProxy management UI, like the port, the login and password, etc.

I restart my container to take into account the configuration, and I can have a look to the management UI:

screen-shot-2017-02-08-at-4-37-45-pm

All is green meaning that our health checks are OK.

Assuming my load balancer has the following host name “my-nifi-vip”, I can now access the UI through https://my-nifi-vip:9443/nifi :

screen-shot-2017-02-08-at-4-41-53-pm

On my UI, I can configure a ListenHTTP processor as below:

Screen Shot 2017-02-08 at 4.43.14 PM.png

screen-shot-2017-02-08-at-4-45-05-pm

And send data to my cluster using the virtual IP provided by my load balancer:

while true; do curl -X POST http://my-nifi-vip:9999/test; done;

In the HAProxy management UI, I can confirm that my requests are correctly load balanced on all the nodes of my cluster:

screen-shot-2017-02-08-at-5-17-01-pm

It is now really easy to add or remove new nodes in your NiFi cluster without impacting the clients sending data into NiFi since they only need to know about the virtual IP exposed by the load balancer. You just need to update the configuration file when you are adding nodes, and restart your HAProxy container.

Keep in mind that even if the data is correctly load balanced over the nodes, all the requests are going through a single point, the load balancer. Consequently, on a performance standpoint, your load balancer may become a bottleneck in case you need to handle a very large number of connections per second. However load balancers are designed to be as efficient as possible and you should be OK in most cases.

As always questions/comments are welcomed.

Using counters in Apache NiFi

You may not know it but you have the availability to define and play with counters in NiFi. If policies are correctly configured (if your NiFi is secured), you should be able to access the existing counters using the menu:

Screen Shot 2017-02-07 at 5.33.59 PM.png

Counters are just values that you can increase or decrease of a given delta. This is useful if you want to monitor particular values along your workflow. At the moment, unless you use a processor that explicitly uses counters or provides a way to define counters, there is nothing available out of the box.

The best way to define and update counters is to use ExecuteScript processor with the following piece of Groovy code:

def flowFile = session.get()
if(!flowFile) return
session.adjustCounter("my-counter", 1, true)
session.transfer(flowFile, REL_SUCCESS)

With this example, the ExecuteScript processor will just transmit the flow file without any modification to the success relationship but will also increment the counter “my-counter” of 1. If this counter does not exist it will be initialized with the delta value given as argument.

Here is the documentation of this method:

    /**
     * Adjusts counter data for the given counter name and takes care of
     * registering the counter if not already present. The adjustment occurs
     * only if and when the ProcessSession is committed.
     *
     * @param name the name of the counter
     * @param delta the delta by which to modify the counter (+ or -)
     * @param immediate if true, the counter will be updated immediately,
     *            without regard to whether the ProcessSession is commit or rolled back;
     *            otherwise, the counter will be incremented only if and when the
     *            ProcessSession is committed.
     */
    void adjustCounter(String name, long delta, boolean immediate);

Let’s see an example: I want to confirm the correct behavior of the GetHDFS processor when I have multiple instances of this processor looking into the same directory but getting different flow files based on a regular expression.

Here is the first part of my flow:

screen-shot-2017-02-07-at-6-20-52-pm

Basically, I am generating flow files every 1ms with GenerateFlowFile. The generated flow files will be named after the generation date timestamp (without any extension). I am sending the files into HDFS and then I’m using a RouteOnAttribute where I check the filename according to a regular expression to split files according to even and uneven names. This way I can increment counters tracking the number of files I sent to HDFS with even names and with uneven names.

Here is the second part of my flow:

Screen Shot 2017-02-07 at 6.27.28 PM.png

I have two instances of GetHDFS processor configured to look into the same input directory but one with a regular expression to look for files with an even name, and one to look for files with an uneven name, this way there is no concurrent access. Besides, the processor is configured to delete the file on HDFS once the file is retrieved in NiFi. Then I update two different counters to track the number of files with an even name that I retrieved from HDFS, and one for files with an uneven name.

If everything is working correctly, I should be able to let run my workflow a bit, then stop the generation of flow files, wait for all the flow files to be processed and confirm that:

  • even-producer counter is equal to even-consumer counter
  • unenven-producer counter is equal to uneven-consumer counter

Let’s have a look into our counters table:

screen-shot-2017-02-07-at-6-47-02-pm

It looks like we are all good 😉

As a remark, if you have multiple processors updating the same counter, then you will have the global value of the counter but also the value at each processor level. For example, if I have:

screen-shot-2017-02-07-at-6-53-09-pm

With both ExecuteScript incrementing the same “test” counter, then, I’ll have:

screen-shot-2017-02-07-at-6-55-48-pm

Also, as a last remark, you can notice that it’s possible to reset a counter to 0 from the counters table with the button in the last column (assuming you have write access to the counters based on the defined policies). It can be useful when doing some tests.

As always, questions/comments are welcomed!

NiFi and OAuth 2.0 to request WordPress API

A lot of famous websites are allowing you to develop custom applications to interact with their API. In a previous example, we saw how to use NiFi to perform OAuth 1.0A authentication against Flickr API. However a lot of websites are using OAuth 2.0 mechanism to authenticate your applications. You can find more details here, and check the differences between the two versions here.

Since this blog is hosted and powered by WordPress, and since WordPress is allowing you to develop applications and is using Oauth 2.0 as authentication mechanism, let’s try to get the statistics of my blog using NiFi.

Before going into the details, let’s recap the behavior in play with the example of WordPress: a user A develops an application X, this application X is running on the Internet. Then a user B is accessing to the application X. This application X is asking B to grant a set of permissions to access AS user B to WordPress. If user B accepts, then application X can interact with WordPress as user B.

This is something you must have experienced with some applications like Facebook, Google, Instagram, LinkedIn, etc… All asking your permissions to post some content in your name on other websites/applications.

Now let’s understand what is going on from an OAuth 2.0 point of view.

When user B accesses application X, the application X is issuing a request to WordPress saying that the application X is requesting access to WordPress resources. The user B will be asked to authenticate with its WordPress credentials and to approve the request of the application X to grant the application a set of permissions on the resources belonging to B. Once done, the application X will get from WordPress a short time limited code. Then, the application is going to issue another request to WordPress using this code and telling which resource the application wants to access. WordPress will then return an access token and the ressource ID the application is allowed to use in API calls. At this point, the application is able to request all the API endpoints to get all the data of the given resource (a WordPress blog in this example).

OK… So now, let’s build our application using NiFi!

I’ll demonstrate something “simple”: a web service exposed by NiFi that gives users access to the stats of their blog. (in the example, it will be my blog since I’ll be connecting to my application using my credentials, but that could be any WordPress user)

Let’s define my application in WordPress so that WordPress is aware of this application and generates me some secret tokens to identify my application. I go here and I create an application that I call NiFi. Notice that the redirect URL is http://localhost:9999/ because this is where the web service created in NiFi will be listening. This could be something online but my NiFi would need to be opened on the Internet.

screen-shot-2017-02-01-at-12-04-19-am

The redirect URL will be the endpoint where the user will be redirected once the user has granted access to the application to WordPress resources belonging to the user. In this case we want to send back the user to our listening web service. It might be easier to understand later with the example, don’t worry 😉

Once my application is created, WordPress gives me some information that will be particularly useful:

screen-shot-2017-02-01-at-12-07-42-am

That’s all we need on WordPress side. Let’s start building our NiFi workflow!

In the end the workflow will be:

Screen Shot 2017-02-01 at 12.24.29 AM.png

We start with a HandleHttpRequest that is listening to requests performed by the user. We specify the processor to listen on localhost:9999.

Then I use an UpdateAttribute processor to add all the “common properties” I want to access in all my processors through expression language:

Screen Shot 2017-02-01 at 12.13.51 AM.png

Then I use a RouteOnAttribute to route the request based on the URL. Indeed, I am expecting users to access my web service with the URL http://localhost:9999/getCode but WordPress will also send requests to my service when redirecting users on URL like http://localhost:9999/code=…&state.

Here is my RouteOnAttribute:

Screen Shot 2017-02-01 at 12.16.07 AM.png

When this is a request sent by a user (containing “getCode” in the URL), then I use a InvokeHTTP processor to send a request to WordPress. This will give me the page where I need to send my user so that the user can authenticate and grant my application all permissions.

Based on WordPress documentation, the URL to request with a GET is:

screen-shot-2017-02-01-at-12-18-07-am

And what I received from WordPress (basically the page to let the user authenticate himself) is what I return to the user through a HandleHttpResponse processor. This way the user will access the page to authenticate on WordPress and grant my application all permissions, then the user will be redirected back to my application with a URL containing the code I need to get a token (thanks to the redirect URL we defined).

When the redirection is performed, I am back to my HandleHttpRequest, but, this time, at the RouteOnAttribute, I’ll go in unmatched relationship (no “getCode” in the URL since, this time, this is the redirect URL). At this point, I use an UpdateAttribute to extract the code from the callback URL used by WordPress:

screen-shot-2017-02-01-at-12-20-34-am

I am now able to create the content that will be sent to WordPress in the next request using a ReplaceText processor (indeed, since it will be a POST request, I need to update the content of my FlowFile because this will be used as the body of my next HTTP request):

Screen Shot 2017-02-01 at 12.25.38 AM.png

And I can perform my POST request using a InvokeHTTP processor in which I specify the content type to “application/x-www-form-urlencoded”.

This request will give me back a JSON looking like:

Screen Shot 2017-02-01 at 12.27.00 AM.png

So I use an EvaluateJsonPath processor to extract the blog ID and the access token:

Screen Shot 2017-02-01 at 12.27.53 AM.png

And I am now able to perform my last request with a InvokeHttp to request the API endpoint of WordPress to get statistics associated to the blog ID:

Screen Shot 2017-02-01 at 12.28.58 AM.png

And I add a property to specify my access token as a header property:

Screen Shot 2017-02-01 at 12.29.33 AM.png

Then I send back the result to a HandleHttpResponse to display the result of the request to the user. Obviously at this point we could do something nicer with the statistics and display some charts for example… but that’s outside the purpose of this blog: I just return the JSON containing the statistics 🙂

That’s all! Let’s now see what it looks like when connecting to the web service while the full flow is running:

When I go to http://localhost:9999/getCode

I get to this page:

Screen Shot 2017-02-01 at 12.32.23 AM.png

I enter my credentials, and since I’ve a two-steps authentication, I get on a web page asking for another access code that I received on my smartphone. Once the code is entered, I am asking to grant permissions to the application:

Screen Shot 2017-02-01 at 12.35.44 AM.png

Then I approve, and I finally get the statistics in a JSON:

Screen Shot 2017-02-01 at 12.36.55 AM.png

That’s pretty cool, isn’t it? The template is available here.

Now I’m sure you can imagine a lot of great applications using OAuth 2.0 mechanism to interact with various existing APIs!

As always, comments and questions are welcomed! I hope you enjoyed this blog!