Monitoring NiFi – Bootstrap notifier

Note – This article is part of a series discussing subjects around NiFi monitoring.

When NiFi is running, there is more than one process running on the server. Let’s have a look:

[root@pvillard-hdf-2 ~]# ps -ef | grep nifi
nifi     19380     1  0 12:33 ?        00:00:00 /bin/sh /usr/hdf/current/nifi/bin/nifi.sh start
nifi     19382 19380  0 12:33 ?        00:00:06 /usr/java/jdk1.8.0_131//bin/java -cp /usr/hdf/current/nifi/conf:/usr/hdf/current/nifi/lib/bootstrap/* -Xms12m -Xmx24m -Dorg.apache.nifi.bootstrap.config.log.dir=/var/log/nifi -Dorg.apache.nifi.bootstrap.config.pid.dir=/var/run/nifi -Dorg.apache.nifi.bootstrap.config.file=/usr/hdf/current/nifi/conf/bootstrap.conf org.apache.nifi.bootstrap.RunNiFi start
nifi     19419 19382  7 12:33 ?        00:07:36 /usr/java/jdk1.8.0_131/bin/java -classpath /usr/hdf/current/nifi/conf:/usr/hdf/current/nifi/lib/jcl-over-slf4j-1.7.12.jar:/usr/hdf/current/nifi/lib/jul-to-slf4j-1.7.12.jar:/usr/hdf/current/nifi/lib/log4j-over-slf4j-1.7.12.jar:/usr/hdf/current/nifi/lib/logback-classic-1.1.3.jar:/usr/hdf/current/nifi/lib/logback-core-1.1.3.jar:/usr/hdf/current/nifi/lib/nifi-runtime-1.1.0.2.1.2.0-10.jar:/usr/hdf/current/nifi/lib/nifi-api-1.1.0.2.1.2.0-10.jar:/usr/hdf/current/nifi/lib/nifi-documentation-1.1.0.2.1.2.0-10.jar:/usr/hdf/current/nifi/lib/nifi-framework-api-1.1.0.2.1.2.0-10.jar:/usr/hdf/current/nifi/lib/nifi-nar-utils-1.1.0.2.1.2.0-10.jar:/usr/hdf/current/nifi/lib/slf4j-api-1.7.12.jar:/usr/hdf/current/nifi/lib/nifi-properties-1.1.0.2.1.2.0-10.jar -Dorg.apache.jasper.compiler.disablejsr199=true -Xmx512m -Xms512m -Dambari.application.id=nifi -Dambari.metrics.collector.url=http://pvillard-hdf-1:6188/ws/v1/timeline/metrics -Dsun.net.http.allowRestrictedHeaders=true -Djava.net.preferIPv4Stack=true -Djava.awt.headless=true -XX:+UseG1GC -Djava.protocol.handler.pkgs=sun.net.www.protocol -Dnifi.properties.file.path=/usr/hdf/current/nifi/conf/nifi.properties -Dnifi.bootstrap.listen.port=39585 -Dapp=NiFi -Dorg.apache.nifi.bootstrap.config.log.dir=/var/log/nifi org.apache.nifi.NiFi -k A2EA52795B33AB2F21C93E7E820D08369F1448478C877F4C710D6E85FD904AE6

As you can see, the script is running a bootstrap (running a JVM between 12 and 24MB) that is in charge of the NiFi JVM itself. In this example, the script is running with the PID 19380 and is the parent of the bootstrap process running with PID 19382. The bootstrap itself is the parent of NiFi running with the PID 19419.

If you only kill the NiFi process, the bootstrap will detect it and automatically relaunch NiFi for you:

kill -9 19419

Then I have:

nifi     19380     1  0 12:33 ?        00:00:00 /bin/sh /usr/hdf/current/nifi/bin/nifi.sh start
nifi     19382 19380  0 12:33 ?        00:00:06 /usr/java/jdk1.8.0_131//bin/java -cp /usr/hdf/current/nifi/conf:/usr/hdf/current/nifi/lib/bootstrap/* -Xms12m -Xmx24m -Dorg.apache.nifi.bootstrap.config.log.dir=/var/log/nifi -Dorg.apache.nifi.bootstrap.config.pid.dir=/var/run/nifi -Dorg.apache.nifi.bootstrap.config.file=/usr/hdf/current/nifi/conf/bootstrap.conf org.apache.nifi.bootstrap.RunNiFi start
nifi     24702 19382 99 14:20 ?        00:00:05 /usr/java/jdk1.8.0_131/bin/java -classpath /usr/hdf/current/nifi/conf:/usr/hdf/current/nifi/lib/jcl-over-slf4j-1.7.12.jar:/usr/hdf/current/nifi/lib/jul-to-slf4j-1.7.12.jar:/usr/hdf/current/nifi/lib/log4j-over-slf4j-1.7.12.jar:/usr/hdf/current/nifi/lib/logback-classic-1.1.3.jar:/usr/hdf/current/nifi/lib/logback-core-1.1.3.jar:/usr/hdf/current/nifi/lib/nifi-runtime-1.1.0.2.1.2.0-10.jar:/usr/hdf/current/nifi/lib/nifi-api-1.1.0.2.1.2.0-10.jar:/usr/hdf/current/nifi/lib/nifi-documentation-1.1.0.2.1.2.0-10.jar:/usr/hdf/current/nifi/lib/nifi-framework-api-1.1.0.2.1.2.0-10.jar:/usr/hdf/current/nifi/lib/nifi-nar-utils-1.1.0.2.1.2.0-10.jar:/usr/hdf/current/nifi/lib/slf4j-api-1.7.12.jar:/usr/hdf/current/nifi/lib/nifi-properties-1.1.0.2.1.2.0-10.jar -Dorg.apache.jasper.compiler.disablejsr199=true -Xmx512m -Xms512m -Dambari.application.id=nifi -Dambari.metrics.collector.url=http://pvillard-hdf-1:6188/ws/v1/timeline/metrics -Dsun.net.http.allowRestrictedHeaders=true -Djava.net.preferIPv4Stack=true -Djava.awt.headless=true -XX:+UseG1GC -Djava.protocol.handler.pkgs=sun.net.www.protocol -Dnifi.properties.file.path=/usr/hdf/current/nifi/conf/nifi.properties -Dnifi.bootstrap.listen.port=39585 -Dapp=NiFi -Dorg.apache.nifi.bootstrap.config.log.dir=/var/log/nifi org.apache.nifi.NiFi -k A2EA52795B33AB2F21C93E7E820D08369F1448478C877F4C710D6E85FD904AE6

As you can see the NiFi process has a new PID since it is a new process launched by the bootstrap.

You can find more information about the bootstrap system in the administration guide. One interesting feature with this bootstrap approach is the bootstrap notifier. It allows you to configure a notification service that will be triggered when the bootstrap starts, stops or detects an interruption of the NiFi process.

With the new version of Apache NiFi (1.2.0), you can send notification to an HTTP(S) endpoint. If you need a custom notification service (example: send SNMP traps), it’s not that hard: you just need to extend AbstractNotificationService. You can have a look at the two existing implementations: email notifier and HTTP notifier.

Let’s configure our NiFi to use the email notification service and see what is the result. On each node of our cluster, we need to update the bootstrap.conf configuration file as below:

###
# Notification Services for notifying interested parties when NiFi is stopped, started, dies
###

# XML File that contains the definitions of the notification services
notification.services.file=./conf/bootstrap-notification-services.xml

# In the case that we are unable to send a notification for an event, how many times should we retry?
notification.max.attempts=5

# Comma-separated list of identifiers that are present in the notification.services.file; which services should be used to notify when NiFi is started?
nifi.start.notification.services=email-notification

# Comma-separated list of identifiers that are present in the notification.services.file; which services should be used to notify when NiFi is stopped?
nifi.stop.notification.services=email-notification

# Comma-separated list of identifiers that are present in the notification.services.file; which services should be used to notify when NiFi dies?
nifi.dead.notification.services=email-notification

In this case, we are saying that the configuration of our notification services (we can define and use multiple notifiers) is in the file

./conf/bootstrap-notification-services.xml

and that we want to use the notifier called “email-notification” (that’s the default name defined in the XML configuration file) for stop, start, and dead events. As stated in the documentation, it’s possible to define a list of notifiers for each type of event.

For this demonstration, I’ll use my Gmail account and the Gmail SMTP server to send the notifications (obviously, with this example, NiFi needs a public internet access to send the requests to the SMTP server). Here is the configuration file:

<services>
     <service>
        <id>email-notification</id>
        <class>org.apache.nifi.bootstrap.notification.email.EmailNotificationService</class>
        <property name="SMTP Hostname">smtp.gmail.com</property>
        <property name="SMTP Port">587</property>
        <property name="SMTP Username">my-email-address@gmail.com</property>
        <property name="SMTP Password">myPassword</property>
        <property name="SMTP TLS">true</property>
        <property name="From">"NiFi Service Notifier"</property>
        <property name="To">email address that will receive the notifications</property>
     </service>
</services>

Here are the emails I received when restarting NiFi or killing the NiFi process:

  • Start event

Title: NiFi Started on Host pvillard-hdf-2 (172.26.249.33)

Hello,

Apache NiFi has been started on host pvillard-hdf-2 (172.26.249.33) at 2017/04/27 22:15:01.376 by user nifi

  • Stop event

Title: NiFi Stopped on Host pvillard-hdf-2 (172.26.249.33)

Hello,

Apache NiFi has been told to initiate a shutdown on host pvillard-hdf-2 (172.26.249.33) at 2017/04/27 22:14:35.702 by user nifi

  • Dead event

Title: NiFi Died on Host pvillard-hdf-2 (172.26.249.33)

Hello,

It appears that Apache NiFi has died on host pvillard-hdf-2 (172.26.249.33) at 2017/04/28 09:52:01.973; automatically restarting NiFi

OK, now let’s see how to configure the HTTP notification service. Let’s say I have a web server listening here:

http://pvillard-hdf-4:9999/notification

My XML configuration is now:

<services>
     <service>
        <id>email-notification</id>
        <class>org.apache.nifi.bootstrap.notification.email.EmailNotificationService</class>
        <property name="SMTP Hostname">smtp.gmail.com</property>
        <property name="SMTP Port">587</property>
        <property name="SMTP Username">my-email-address@gmail.com</property>
        <property name="SMTP Password">myPassword</property>
        <property name="SMTP TLS">true</property>
        <property name="From">"NiFi Service Notifier"</property>
        <property name="To">email address that will receive the notifications</property>
     </service>
     <service>
        <id>http-notification</id>
        <class>org.apache.nifi.bootstrap.notification.http.HttpNotificationService</class>
        <property name="URL">http://pvillard-hdf-4:9999/notification</property>
     </service>
</services>

And my bootstrap.conf configuration file now contains:

###
# Notification Services for notifying interested parties when NiFi is stopped, started, dies
###

# XML File that contains the definitions of the notification services
notification.services.file=./conf/bootstrap-notification-services.xml

# In the case that we are unable to send a notification for an event, how many times should we retry?
notification.max.attempts=5

# Comma-separated list of identifiers that are present in the notification.services.file; which services should be used to notify when NiFi is started?
nifi.start.notification.services=email-notification,http-notification

# Comma-separated list of identifiers that are present in the notification.services.file; which services should be used to notify when NiFi is stopped?
nifi.stop.notification.services=email-notification,http-notification

# Comma-separated list of identifiers that are present in the notification.services.file; which services should be used to notify when NiFi dies?
nifi.dead.notification.services=email-notification,http-notification

I now restart NiFi, and I can confirm that I receive a notification (I used a standalone NiFi to confirm the reception of the notification):

Screen Shot 2017-05-02 at 5.15.29 PM

The content of the notification is the same as with the email notification service. Note that it’s also possible to configure properties for a keystore and a truststore to send notifications using HTTPS:

<service>
   <id>http-notification</id>
   <class>org.apache.nifi.bootstrap.notification.http.HttpNotificationService</class>

   /* The URL to send the notification to */
   <property name="URL"></property>

   /* Max wait time for connection to remote service - default is 10s */
   <property name="Connection timeout"></property>

   /* Max wait time for remote service to read the request sent - default is 10s */
   <property name="Write timeout"></property>

   /* The fully-qualified filename of the Truststore */
   <property name="Truststore Filename"></property>

   /* The Type of the Truststore. Either JKS or PKCS12 */
   <property name="Truststore Type"></property>

   /* The password for the Truststore */
   <property name="Truststore Password"></property>

   /* The fully-qualified filename of the Keystore */
   <property name="Keystore Filename"></property>

   /* The Type of the Keystore. Either JKS or PKCS12 */
   <property name="Keystore Type"></property>

   /* The password for the Keystore */
   <property name="Keystore Password"></property>

   /* The password for the key. If this is not specified, but the Keystore Filename, Password, and Type are specified, then the Keystore Password will be assumed to be the same as the Key Password */
   <property name="Key Password"></property>

   /* The algorithm to use for this SSL context. Either TLS or SSL */
   <property name="SSL Protocol"></property>
</service>

Note that I also received an email notification since I specified the two notifiers in the configuration.

Again, if you need a custom notifier, this should not be really difficult and you are more than welcome to contribute your notifier code to the Apache community. It will give more options to users when setting a bootstrap notifier.

As usual feel free to ask questions and comment this post.

URL shortener service with Apache NiFi

This blog will demonstrate a new use case using Apache NiFi: implement a URL shortener service. Let’s be clear right now, I don’t think Apache NiFi is the best option to propose such a service (this is not the idea behind this Apache project) but I believe this is an opportunity to play around with some processors/functionalities I never discussed so far.

Why this idea? Some months ago, for a job interview, I have been asked to develop a URL shortener service in Go using Docker and Redis. This is available on Github here. Since this is something we can do with Apache NiFi, it is interesting to see how this can be achieved.

Before talking about Apache NiFi (if you don’t know about this project, have a look on my previous posts, this is a great tool!), let’s discuss what we want to achieve…

  • URL shortener service requirements

I want to expose a web service that gives me the opportunity to shorten long URLs in order to be able to share/remember it easily. I also want to store statistics about the number of times my shortened URL has been used. And, in our use case, I want my shortened URL to be valid at least 24h.

  • Example

I want to shorten this URL:

https://pierrevillard.com/2016/04/04/analyze-flickr-account-using-apache/

Let’s say my service is running on my local computer, I am going to access it with my browser:

localhost/shortlink?url=https://pierrevillard.com/2016/04/04/analyze-flickr-account-using-apache/

I will get a web page displaying JSON data:

{"key":"6974","url":"https://pierrevillard.com/2016/04/04/analyze-flickr-account-using-apache/","date":1460238097015,"count":0}

From now on, I will be able to access my URL by using:

localhost/6974

And I will be able to access the statistics (JSON data) of my shortened URL at:

localhost/admin?key=6974

  • Implementation with Apache NiFi

This use case gives me the opportunity to discuss about some nice features of Apache NiFi:

  1. The possibility to expose web services with the use of HandleHttpRequest and HandleHttpResponse processors in combination with a StandardHttpContextMap controller service.The controller service provides the ability to store and retrieve HTTP requests and responses external to a Processor, so that multiple processors can interact with the same HTTP request. In other words, the HandleHttpRequest processor initializes a Jetty web server listening for requests on a given port. Once a request is received, a FlowFile is generated with attributes and content (if any). This request has been received asynchronously so that the FlowFile can be used along a data flow before generating the response to send back to the user using the HandleHttpResponse processor.

    Note: at this moment, for a given listening port, there can be only one instance of the HandleHttpRequest processor on the canvas. Handling different services with the same processor can be performed with the addition of a RouteOnAttribute processor (we’ll see that in this implementation).

  2. The possibility to store information across the NiFi cluster (but also to have a distributed map cache of key/value data) using PutDistributedMapCache and FetchDistributedMapCache processors in combination with a DistributedMapCacheServer and DistributedMapCacheClientService controller services.The map cache server controller service provides a map (key/value) cache that can be accessed over a socket. Interaction with this service is typically accomplished via a DistributedMapCacheClient service. This feature is mainly used to share information across a NiFi cluster but can also be used at local level to store data in memory to be used along the flow.

    It has to be noted that, when using the PutDistributedMapCache processor, the key is given through the processor properties and the value is the content of the incoming FlowFile.

Let’s now describe the data flow I have created for the URL shortener service. First I have a single HandleHttpRequest processor with default configuration and a default StandardHttpContextMap controller service.

Then I use a RouteOnAttribute processor to define which URL has been accessed and to route the FlowFile (FF) accordingly. In the incoming FF, the attribute ‘http.request.uri’ contains the requested URL.

routeOnAttribute

At this point, I have three routes for my FF: one for the /shortlink requests, one for the /admin requests and one for the others.

Let’s start with the ‘admin’ flow part. When I receive a request at:

/admin?key=<key>

My incoming FF will have an attribute ‘http.query.param.key’ with the value <key>. This is useful to retrieve all the arguments passed along the URL.

I take the decision to use the map cache server as follow: the key will be the key value of my JSON data, and the value will be the JSON string itself. Consequently, I use a FetchDistributedMapCache processor to retrieve the JSON data associated to the given key.

fetchdistributedmapcache

If I don’t find anything, I use a ReplaceText processor to set an arbitrary error message as the content of my FF and then a HandleHttpResponse processor with the 500 HTTP error code. This way, the user will see the error message:

No URL found for key=6666

If I find an entry in my cache server, then the FF content is now filled in with my JSON string and I just need to route my FF to a HandleHttpResponse processor with a 200 HTTP code. This way, the user will see the JSON string with related information.

Let’s continue with the “shortlink” part of the flow. First of all I use a RouteOnAttribute processor to check that the URL provided at:

/shortlink?url=<URL>

is valid given a regular expression. If not I display an error message to the user with the combination of ReplaceText processor and HandleHttpResponse processor (as explained above).

If the URL is valid, I want to generate a key associated to this URL.

Note: for this part, I made the choice to keep it really simple and there are a lot of possible improvements/optimizations.

I make the decision that the key will be a 4-digits number and I create this number using time manipulation with the expression language. With an UpdateAttribute processor, I generate a ‘key’ attribute:

keyGeneration

Once the key is generated I use a FetchDistributedMapCache processor to check if this key is already used or not.

If yes, my FF now contains the associated JSON string. I use an EvaluateJsonPath processor to extract the creation date information from the JSON string and then I use a RouteOnAttribute processor to check if this creation date if below the threshold of 24 hours. If yes, I route my FF back to the UpdateAttribute processor to generate a new random key, if no, it means the key can be overwritten and I route my FF to the next steps.

Once I have a generated key that is free to use, I use a ReplaceText processor to construct my JSON string:

JSONconstruction

Then I store this information in the cache using a PutDistributedMapCache processor:

putdistributedmapcache

Note the “replace if present” property in case we are overwriting an already existing key that is too old. Then I just route my FF to a HandleHttpResponse processor with a 200 HTTP code to display to the user the JSON string.

As I said, this part is simple, there are a lot of possible improvements such as (but not limited to):

  • Have a text-based key with the possibility for the user to customize it in order to expand the number of possible shortened URL stored.
  • Use the cache server to store a sequence ID for the generated key in order to avoid randomness and possible loops in the flow.
  • Add the possibility to reuse the same key for two identical URLs to shorten.

Finally, let’s complete our use case with the last part of the flow: when a HTTP request is received which is not shortlink/admin.

The accessed URL is obtained using the FF attributes, and I can directly use a FetchDistributedMapCache processor:

fetchforredirection

If I don’t find any entry in my cache, I route the FF to a combination of ReplaceText and HandleHttpResponse processors to display an error message to the user with a 404 HTTP error code.

If I find a match, I use an EvaluateJsonPath processor to extract the counter value and the long URL from the JSON string retrieved in the cache. Then I first route my FF to a HandleHttpResponse processor with a 307 HTTP code (temporary redirection) and I add a HTTP header property to redirect the user to the corresponding URL:

httpresponse

Note: I use the 307 HTTP code to avoid my browser to cache the redirection and to perform the request each time I access my shortened URL.

I also route my FF to a ReplaceText processor in order to increment the count value in my JSON string and I use a PutDistributedMapCache processor to update the data in the cache.

That’s it! We now have a running URL shortener service with Apache NiFi. The flow is available as a template here. As always, feel free to comment and/or ask questions about this post.