FoD Paris Jan 18′ – NiFi Registry and workflow monitoring with a use case

I got the chance to talk a bit about NiFi during the last Future of Data meetup in Paris, and I wanted to share in a blog what I explained during this event.

First of all, I want to remind you that Apache NiFi 1.5.0, MiNiFi 0.4.0 and NiFi Registry 0.1.0 are out and it represents a huge step forward in the community as it brings flow development life cycle (FDLC) to a completely new level. Besides, this first version of the registry is a solid foundation for what will come in the future.

Ok let’s get back to what I discussed at the meetup.

  • Introduction to NiFi Registry

What is the NiFi Registry? It’s a new component of the NiFi ecosystem, it’s a web-based application that is meant to be used as a central location for storage and management of shared resources across one or more instances of NiFi and/or MiNiFi.

In a typical multi-clusters environment, this could be represented as below:

Screen Shot 2018-02-02 at 2.03.45 PM.png

At the moment, the integration between NiFi and the NiFi Registry allows you to store, retrieve, and upgrade versioned flows. Next versions will allow users to store more different types of resources. And if you’re asking… yes the NiFi Registry can be secured and provides authorization support to control users and access policies to the versioned flows.

Note that, even if you have a single environment, using the NiFi Registry will also make a lot of sense. Firstly, it’s a convenient way to save your work and to version it (and to be honest, this argument alone should convince you), but it’s also a practical way to share common reusable parts of flows between multiple workflows.

For example, let’s say you have a process group designed to perform a very common task like transforming XML data to JSON and adding attributes to the flow files. If you have to update this process groups with few changes and if this process group is used in multiple places, you don’t need to copy/paste your changes everywhere. You just have to version it and as soon as you update it, you’ll be able to update all the other instances of this process group to the latest version.

  • NiFi monitoring NiFi

I already posted some blogs around NiFi monitoring but that’s a recurrent subject and we have quite a lot of options to answer monitoring requirements someone might have.

What do we mean by “monitoring”? In my opinion, we have two main subjects:

  1. Dashboarding: expose various metrics data to measure the application performance and stability.
  2. Alerting: send notifications when something is wrong.

In the context of NiFi, we can also have two types of monitoring:

  1. “Technical” monitoring: is the NiFi service working as expected? are the nodes up and alive?
  2. “Business” monitoring: is a particular workflow for a given project behaving as expected?

For both types of monitoring, my recommendation is to use the reporting tasks available in NiFi. Reporting tasks provide an efficient way to extract and process the data generated by the framework for technical related metrics as well as workflow related metrics.

  • Use case study

I’m going to articulate this blog around a very simple use case that will demonstrate the use of the NiFi Registry to perform FDLC tasks. And I’ll also use it as a baseline to show various ways of monitoring both the NiFi service and a particular workflow.

The use case is to expose an HTTP REST end point using NiFi so that an application can send data through HTTP calls to NiFi. The idea is to route the received data based on the URI used in the HTTP call. For each type of data, I’ll perform XML validation against a schema, XML to JSON conversion, perform simple data transformations and send the data into a Kafka topic.

  • NiFi monitoring: survival kit

Before going into the details of the use case, I’ll give my personal opinion of what should be the minimum setup for efficient NiFi monitoring. Here are the 6 reporting tasks that should be configured:

Screen Shot 2018-02-02 at 3.15.03 PM.png

# The Ambari reporting task is used to send metrics to the Ambari Metrics Service (obviously, this reporting task only makes sense if you have Ambari and AMS available, and, if not, there are similar reporting tasks for different systems). When using the reporting task, the following metrics will be available in Ambari on the NiFi service page:

Screen Shot 2018-02-02 at 3.51.48 PM.png

Things to monitor carefully are:

  1. JVM heap usage to avoid OutOfMemory errors. Even though NiFi is not requiring a lot of RAM, some processors/use cases might need more RAM. It also depends how much flow files NiFi is going to handle at one time.
  2. Active threads is the mean number of active threads per node in the NiFi cluster. By default, each NiFi node will initialize a pool of 10 threads for timer driven components. If you see that this metric is always equal to the size of the thread pool, then you might need to increase the thread pool size. But this number needs to be changed with care in coordination with CPU/load metrics of your NiFi nodes. To change the thread pool size, go into Controller Settings / General (change will be immediately effective, no restart required).
  3. Flow files queued is the number of flow files currently being processed by NiFi. If this graph is continually increasing, it can be a symptom of something going wrong. It can totally be expected to have peaks at some specific hours of the day based on the use cases, and NiFi will protect itself using the backpressure mechanism but, in any case, always good to keep an eye on this metric.
  4. Total task duration can also be something to look at: in case this graph is constantly going higher, it means that it takes more and more time to process the data. It can be expected if volumetry is changing or if workflows are updated but it can also be the symptom that a processor is not performing well.

# The MonitorDiskUsage reporting tasks are useful to be sure that bulletins will be emitted before having a “no space left on device” error message. I recommend setting one per repository (at least 3, but it can be more if you have multiple content/provenance repositories). In my screenshot, I defined one for provenance repository, one for content repository and one for flow file repository.

# The Site to Site bulletin reporting task is used to reingest all the generated bulletins into NiFi. Generally speaking Site to Site reporting tasks are a really powerful tool as it allows you to reingest data generated by NiFi’s framework into NiFi itself so you can use all the processors you want to process this data. With this specific reporting task, you’re able to deal with every generated bulletins into a dedicated workflow and do whatever suits you: email notification, dashboarding, automatic issue creation in JIRA, etc…

# The Site to Site status reporting task is used to frequently take a screenshot of what is going on in all the workflows existing on the canvas. It will generate an JSON array of JSON documents representing the status of every components (processors, ports, relationships, etc) including the last 5-minutes statistics data.

  • NiFi monitoring NiFi

Using strict naming conventions, you can leverage generic workflows dedicated to monitoring and perform quite powerful things. My approach is really to use NiFi as a way to monitor itself. You might ask: what is going to monitor the workflows dedicated to monitoring? To be honest, if there is something wrong going on with the monitoring workflows, you will detect it very quickly through the technical monitoring / Ambari reporting task. Or the NiFi service will just be red in Ambari…

Consequently, my personal recommendation is to ask each project team to respect a strict naming convention and to deliver a workflow designed to fulfill the monitoring requirements for the project workflows. Each workflow will certainly require different notification mechanisms, and different metrics in the dashboards.

At the root level of the canvas, I’ll have one process group per project in order to ensure proper authorization policies. I’ll also have a process group dedicated to monitoring, and two input ports to receive the data sent by reporting tasks (with NIFI-4814, it’ll be possible to only use one):

Screen Shot 2018-02-07 at 2.34.36 PM

In this use case study, I assume that the naming convention is the following: <acronym of the project>_<meaningful component name>

Besides, if the metrics of a component should be captured for dashboarding, the name should be suffixed by _GRAFANA.

With the above assumptions, here is what I have in my monitoring process group:

Screen Shot 2018-02-07 at 2.39.16 PM

The UpdateAttribute is used to add a “type” attribute with the value “bulletins” or “status”. I’ll use this attribute to perform routing at a later stage.

The UpdateRecord is used to populate a “project” field by extracting the acronym from the component name. Here is the schema I use for bulletins, and here is the schema I use for status.

For bulletins:

Screen Shot 2018-02-07 at 2.46.21 PM.png

For status data:

Screen Shot 2018-02-07 at 2.46.41 PM

At this point, I have bulletins and status data and I’m able to link this data to every project running in NiFi. Now I send this data in two process groups: one that is dedicated to technical monitoring (used and managed by the team in charge of NiFi service), and one that is dedicated to business (per-project) monitoring:

Screen Shot 2018-02-07 at 2.49.20 PM

Technical monitoring

In the technical monitoring, it’s really up to you regarding what you want… What I usually do is to send the bulletin data in a tool like AMS, Solr or Elasticsearch to have dedicated dashboards. I also set email notifications when backpressure is enabled in a connection.

For bulletins dashboarding, I slightly change the schema of the data and send it into an Elasticsearch instance:

Screen Shot 2018-02-07 at 2.53.40 PM.png

I then create a dashboard in Grafana:

Screen Shot 2018-02-07 at 3.00.03 PM

If necessary I can use the notification mechanism available in Grafana to get email messages when some thresholds are reached.

With the status data, I decided to design a workflow sending email notifications as soon as backpressure is enabled in a connection:

Screen Shot 2018-02-07 at 9.28.30 PM

To do that, I use the QueryRecord processor to only select records representing connections with backpressure enabled:

Screen Shot 2018-02-07 at 9.30.32 PM.png

Again… with the site to site reporting tasks you are ingesting into NiFi its own data and you can use all the processors you want to process this data and fulfill your requirements.

Business (per-project) monitoring

If we go back to the first level of my monitoring process group, you noticed that I’m also sending both bulletin and status data to another process group for “per-project” monitoring:

Screen Shot 2018-02-07 at 2.49.20 PM

In this process group, here is what I’m doing:

Screen Shot 2018-02-07 at 9.35.11 PM

I’m using a PartitionRecord processor to partition all the data based on the field /project that we computed earlier. This way, each flow file will only contain data with a unique value for the /project field and this value is also extracted as an attribute of the flow files. Then I just have to use a RouteOnAttribute processor to route the data based on the project that generated this data (if a project didn’t respect the naming convention, the project value will not match any project acronym and the data will be dropped, unless you want to process it somehow – to detect teams not respecting the naming convention!). After the RouteOnAttribute, you have one process group per project and that’s this specific piece that should be developed by every project deployed in NiFi. This way, each project receives both bulletins and status related to the project and each project can implement its own workflow to deal with bulletin and status data.

Before presenting what monitoring I’m doing for the use case of this post, I’ll step back a little and present the use case itself and how I leverage the NiFi Registry to deploy my workflow from a development environment to a production environment.

  • Use case presentation

The use case itself is pretty simple and I won’t go too much in the details since it’s not the purpose of this blog. I’m using a HandleHttpRequest processor to listen for HTTP calls on a given port. Then based on what URI has been called by the client I’m routing the flow files to different process group. I’m expecting to receive HTTP POST requests on specific endpoint, each one representing a type of data. The received data is expected to be XML and I’m checking that the data is valid according to an XSD schema. At this point, three options:

  1. if the client requests an endpoint that is not expected, I route the flow file to the HandleHttpResponse processor and return a 404 HTTP code.
  2. if the client posts data on the expected endpoint but the data is not valid, I route the flow file to the HandleHttpResponse processor and return a 400 HTTP code.
  3. if the received data is valid, then I return a 200 HTTP code.

Screen Shot 2018-02-07 at 9.56.24 PM

Screen Shot 2018-02-07 at 9.57.00 PM.png

What I’m doing with valid data in the process group does not really matter and I won’t describe that (this post is already way too long and I’m surprised you’re still reading…).

  • Deployment with NiFi Registry

Let’s try to keep things simple. The NiFi Registry is a web application living outside of NiFi. However, NiFi integrates with the NiFi Registry. This is done by going into the Controller Settings menu where there is a new tab to define the registries to connect with:

Screen Shot 2018-02-07 at 10.01.54 PM.png

With this integration between NiFi and NiFi Registry, you have new icons in the UI you need to get familiar with:

Screen Shot 2018-02-07 at 10.04.10 PM.png

From left to right:

  1. Number of versioned Process Groups that are up to date with the Registry
  2. Number of versioned Process Groups that are locally modified (you can commit your local changes to create a new version of your process group in the Registry)
  3. Number of versioned Process Groups that are stale (there is a more recent version of the Process Group in the Registry and you can update your local process group to this version)
  4. Number of versioned Process Groups that are stale and also with local changes
  5. Number of versioned Process Groups with a synchronization failure (the Registry is not available / unreachable, etc)

The interactions with the Registry through the NiFi UI can be done by right clicking on a process group or on the canvas inside a process group: this will show a Version menu with available commands.

Screen Shot 2018-02-07 at 10.09.36 PM.png

On the NiFi Registry side, here is what is looks like:

Screen Shot 2018-02-07 at 10.12.33 PM.png

In the Registry, it’s possible to create buckets (you’ll usually have one bucket per project). Then, from the NiFi UI, you’ll be able to start versioning a workflow in a bucket. To do that: right click on the process group, Version menu, start version control.

Screen Shot 2018-02-07 at 10.14.44 PM.png

Then you’ll be able to choose the bucket and to give some information about your workflow:

Screen Shot 2018-02-07 at 10.15.18 PM

You now have your workflow versioned in the registry !

Screen Shot 2018-02-07 at 10.16.39 PM

Now… how do I deploy my workflow on the production cluster?

I just have to drag and drop a process group component on the canvas, and I’ll see:

Screen Shot 2018-02-07 at 10.27.14 PM.png

And I can just click on Import to select the bucket, workflow and version I want:

Screen Shot 2018-02-07 at 10.28.26 PM

I now have my workflow imported on the production cluster. Two things to do:

  1. Update the variables of the process group that are used to externalize environment-specific properties of the workflow components. Note that changes to the variables value are not considered as local changes from Registry point of view because two instances of the same workflow version will have different values for different environments.
  2. Start the controller services and processors. When importing a new workflow for the first time, everything is stopped to let users change the variables. When updating an existing workflow to a new version, only the new components will need to be started (no service interruption).

To access variables, you can right click on the process group and click on the Variables menu:

Screen Shot 2018-02-07 at 10.46.10 PM.png

And then you can update the variables with values corresponding to your environment:

Screen Shot 2018-02-07 at 10.46.20 PM.png

If you have multi-levels process groups and if you defined variables at different levels, you will also need to update variables in sub-process groups.

  • Business monitoring for my use case

If you paid attention, in my screenshot of the NiFi Registry, you probably noticed two versioned workflows in the bucket dedicated to my use case. The other workflow is the one dedicated to the monitoring that you also noticed in a previous screenshot:

Screen Shot 2018-02-07 at 9.35.11 PM

Let’s quickly discuss what I suggest for project-specific monitoring. The first thing you might want to do is to receive email notifications when bulletins are generated by components of your workflow.

Then, what I usually suggest for per-project dashboarding is to export the statistics of specific points of the workflow to an external tool and create dashboards. As I said before, in my example, I’m exporting to the Ambari Metrics Service the last 5-minutes count and bytes sum of the flow files going through connections with a name suffixed with _GRAFANA. In my example, I renamed connections to automatically export statistics to get the number of HTTP code 200, 400 and 404:

Screen Shot 2018-02-07 at 10.40.04 PM.png

And here is my monitoring workflow for this specific use case:

Screen Shot 2018-02-07 at 10.42.43 PM.png

I route the monitoring data to have a specific processing for bulletins (email notifications in this case) and a specific processing for status. First, I use a PartitionRecord processor and a RouteOnAttribute to only extract status data for connections (I ignore data about processors, process groups, ports, etc).

Screen Shot 2018-02-07 at 10.45.39 PM.png

Then I split my JSON arrays and extract the field containing the name of the connection to only keep the connections suffixed by _GRAFANA. Then I transform the content to match the format expected by AMS and send the request to AMS (more details here).

And… that’s it… I’m now able to create Grafana dashboards for my specific use case:

Screen Shot 2018-02-07 at 10.53.06 PM.png

Another thing you might want to do is to define a dedicated Ambari Reporting Task with the UUID of the process group dedicated to the project. This way you have additional statistics about this specific process group: number of active threads, number of flow files queued, volume of data received, etc.

  • What about workflow update?

Ok… so I deployed my workflows in production and have great ways to monitor things. What if I’ve a new endpoint to handle in my workflow. Really simple! So far I only process “purchase” data, let’s add another process group to deal with “customer” data:

Screen Shot 2018-02-07 at 11.03.15 PM

I updated my RouteOnAttribute to accept a new endpoint and added a process group to process this data. I added my HTTP 200 & 400 relationships for this new type of data and I carefully renamed the output connections to automatically get this new data into my dashboards.

I can now see that my process group does have local changes:

Screen Shot 2018-02-07 at 11.06.34 PM.png

And I can commit the changes as a new version of my workflow:

Screen Shot 2018-02-07 at 11.07.19 PM.png

Screen Shot 2018-02-07 at 11.07.40 PM

I can now check that, on production’s side, there is a new version available and I can update to this new version:

Screen Shot 2018-02-07 at 11.10.20 PM.png

I can right click on the process group and update to the latest version:

Screen Shot 2018-02-07 at 11.11.23 PM.png

I select the version I want:

Screen Shot 2018-02-07 at 11.11.33 PM

And that’s it! Note that the update from a version A to a version B will stop and restart existing components if and only if modifications have been done on the components. In my case, the HandleHttpRequest has not been modified… consequently, the update of the workflow will not cause any service interruption. That’s pretty cool!

Also note that the update will not cause any data loss in case the update includes the deletion of connections that are currently containing flow files.

And finally, I do have to update, if necessary, the variables of my newly added process group (for customer data) and to start it.

  • Conclusion

I’ve already talked about way too much things, but I hope it’ll get you excited about the NiFi Registry! Please install it and use it, that’s really going to ease your life. As always I invite you to join the Apache NiFi community by subscribing to mailing lists, submitting JIRAs for NiFi, MiNiFi Java/C++, NiFi Registry, and contribute code on NiFi, MiNiFi Java/C++ and NiFi Registry.

Feel free to comment and ask your questions on this post! Thanks for reading me!

Authorizations with LDAP synchronization in Apache NiFi 1.4+

With the release of Apache NiFi 1.4.0, quite a lot of new features are available. One of it is the improved management of the users and groups. Until this release, it was possible to configure a LDAP (or Active Directory) server but it was only used during the authentication process. Once authenticated it was necessary to have explicit policies for this user to access NiFi resources. And to create a policy for a given user, it was first necessary to manually create this user in NiFi users/groups management view. This time is now over. Users/groups management is now greatly simplified in terms of lifecycle management.

In addition to that, if you are using Apache Ranger as the external authorizer system for NiFi, you can now define rules based on LDAP groups. Before, you had to configure, in Ranger, rules explicitly based on users.

In this article, we are going to discuss how this is actually working and how you can configure it.

If you’re interested by the technical details of the implementation, you can look at the corresponding JIRAs (NIFI-4032, NIFI-4059, NIFI-4127) and Github pull requests (#1923, #1978, #2019).

Basically, the authorizer mechanism evolved quite a bit. Before NiFi 1.4, the authorizers.xml was containing a list of configurations for any authorizer implementation you wanted to use to manage policies in NiFi. Unless you developed your own implementations, you had the choice between the FileAuthorizer (default implementation that stores the policies in a local file) and the RangerNiFiAuthorizer to user Apache Ranger as the external mechanism managing the policies.

If using the FileAuthorizer, the configuration was looking like (in a single node installation):

        <property name="Authorizations File">./conf/authorizations.xml</property>
        <property name="Users File">./conf/users.xml</property>
        <property name="Initial Admin Identity”>admin</property>
        <property name="Legacy Authorized Users File"></property>

And we set the corresponding property in the file:

Starting with NiFi 1.4, the authorizers.xml file provides much more functionalities (note that the changes are backward compatible and do not require any change from your side if you don’t want to change it).

Let’s start by the new implementation of the authorizer: the Standard Managed Authorizer.

Note – there is also a new Managed Ranger Authorizer, but I won’t go into the details of this implementation in this blog. This implementation gives you the possibility to use Apache Ranger as the external system managing the authorizations but you still have access to the policies in the NiFi UI, and you can also manage additional users. It’s also this implementation that allows you to define group-based policies in Ranger.

It’s configured as below:

        <property name="Access Policy Provider">file-access-policy-provider</property>

This new implementation expects the identifier of the Access Policy Provider implementation you want to use. This new abstraction will be used to access and manage users, groups and policies… and to enforce policies when dealing with requesting access to NiFi resources. In the above example, our authorizer is identified with name “managed-authorizer”, and that’s what you need to set in to user it:

You can see that this authorizer expects a property Access Policy Provider with the identifier of the provider you want to use… Let’s move on to the Access Policy Provider. For now, there is a single implementation which is the FileAccessPolicyProvider. If you already know about the previous FileAuthorizer, you shouldn’t be very surprised by the expected properties. Here is a configuration example:

        <property name="User Group Provider">file-user-group-provider</property>
        <property name="Authorizations File">./conf/authorizations.xml</property>
        <property name="Initial Admin Identity"></property>
        <property name="Legacy Authorized Users File"></property>

        <property name="Node Identity 1"></property>

Note: as you can see the identifier of this Access Policy Provider is “file-access-policy-provider”, and that’s what we referenced in the property of the authorizer (see above).

As with the FileAuthorizer, you have the Initial Admin Identity property which lets you configure the identity of the user with the admin permissions to set the first policies after a fresh install of NiFi. As the documentation says:

Initial Admin Identity – The identity of an initial admin user that will be granted access to the UI and given the ability to create additional users, groups, and policies. The value of this property could be a DN when using certificates or LDAP, or a Kerberos principal. This property will only be used when there are no other policies defined. If this property is specified then a Legacy Authorized Users File cannot be specified.
NOTE: Any identity mapping rules specified in will also be applied to the initial admin identity, so the value should be the unmapped identity. This identity must be found in the configured User Group Provider.

Then you still have the Legacy Authorized Users File property in case you are upgrading from a NiFi 0.x install and you want to keep your previous policies in place.

You have the Authorizations File property that defines the path to the file that will locally store all the policies. You also find the Node Identity properties in case you are in a NiFi cluster. Nothing changed on this side, but just in case, a quick reminder from the official documentation:

Node Identity [unique key] – The identity of a NiFi cluster node. When clustered, a property for each node should be defined, so that every node knows about every other node. If not clustered these properties can be ignored. The name of each property must be unique, for example for a three nodes cluster: “Node Identity A”, “Node Identity B”, “Node Identity C” or “Node Identity 1”, “Node Identity 2”, “Node Identity 3”.
NOTE: Any identity mapping rules specified in will also be applied to the node identities, so the values should be the unmapped identities (i.e. full DN from a certificate). This identity must be found in the configured User Group Provider.

OK… now we have a new property called “User Group Provider” and that’s where we’re going to specify the identifier of the User Group Provider to be used. This User Group Provider is a new abstraction allowing you to define how users and groups should be automatically retrieved to then define policies on them.

You have multiple implementations available:
  • CompositeUserGroupProvider
  • CompositeConfigurableUserGroupProvider
  • LdapUserGroupProvider
  • FileUserGroupProvider

As the name suggests, the CompositeUserGroupProvider implementation allows you to use at the same time multiple implementations of the User Group Provider. This is very useful, mainly because when using NiFi in clustering mode, you need to define some policies for the nodes belonging to the cluster. And, as you may know, in NiFi, nodes are considered as users. In case your nodes are not defined in your LDAP or Active Directory, you will certainly want to use the composite implementation.

Now you need to consider the CompositeConfigurableUserGroupProvider implementation which is the one you will certainly want to use in most cases. This implementation will also provide support for retrieving users and groups from multiple sources. But the huge difference is that this implementation expects a single configurable user group provider. It means that users and groups from the configurable user group provider are configurable from the UI (as you did when creating users/groups from NiFi UI in previous versions). However, users/groups loaded from one of the other User Group Providers will not be.

Note that it’s up to each User Group provider implementation to define if it is configurable or not. For instance, the LDAP User Group Provider is not configurable: NiFi is not going to manage users and groups in the LDAP/AD server.

A typical configuration will be the definition of the Composite Configurable User Group provider with the File User Group provider as the configurable instance and one instance of the LDAP User Group provider:

       <property name="Configurable User Group Provider">file-user-group-provider</property>
       <property name="User Group Provider 1">ldap-user-group-provider</property>

In this case, in the definition of the access policy provider, we need to change the property to use the correct user group provider:

        <property name="User Group Provider">composite-configurable-user-group-provider</property>
        <property name="Authorizations File">./conf/authorizations.xml</property>
        <property name="Initial Admin Identity"></property>
        <property name="Legacy Authorized Users File"></property>

        <property name="Node Identity 1"></property>

Now, let’s look at the File User Group provider. The objective of this provider is to provide the same functionalities as before: the user can manage users and groups from the UI and everything is stored locally in a file. Configuration looks like:

       <property name="Users File">./conf/users.xml</property>
       <property name="Legacy Authorized Users File"></property>

       <property name="Initial User Identity 1"></property>

The initial user identities are users that should automatically populated when creating the users.xml file for the first time. Typically you would define here your initial admin identity (if this user is not defined via the LDAP user group provider). From the documentation:

Initial User Identity [unique key] – The identity of a users and systems to seed the Users File. The name of each property must be unique, for example: “Initial User Identity A”, “Initial User Identity B”, “Initial User Identity C” or “Initial User Identity 1”, “Initial User Identity 2”, “Initial User Identity 3”.

NOTE: Any identity mapping rules specified in will also be applied to the user identities, so the values should be the unmapped identities (i.e. full DN from a certificate).

OK… now let’s move to the last user group provider: the one allowing an automatic synchronisation of your users and groups with a LDAP/AD server. Here is the configuration part:

       <property name="Authentication Strategy">START_TLS</property>

       <property name="Manager DN"></property>
       <property name="Manager Password"></property>

       <property name="TLS - Keystore"></property>
       <property name="TLS - Keystore Password"></property>
       <property name="TLS - Keystore Type"></property>
       <property name="TLS - Truststore"></property>
       <property name="TLS - Truststore Password"></property>
       <property name="TLS - Truststore Type"></property>
       <property name="TLS - Client Auth"></property>
       <property name="TLS - Protocol"></property>
       <property name="TLS - Shutdown Gracefully"></property>

       <property name="Referral Strategy">FOLLOW</property>
       <property name="Connect Timeout">10 secs</property>
       <property name="Read Timeout">10 secs</property>

       <property name="Url"></property>
       <property name="Page Size"></property>
       <property name="Sync Interval">30 mins</property>

       <property name="User Search Base"></property>
       <property name="User Object Class">person</property>
       <property name="User Search Scope">ONE_LEVEL</property>
       <property name="User Search Filter"></property>
       <property name="User Identity Attribute"></property>
       <property name="User Group Name Attribute"></property>
       <property name="User Group Name Attribute - Referenced Group Attribute"></property>

       <property name="Group Search Base"></property>
       <property name="Group Object Class">group</property>
       <property name="Group Search Scope">ONE_LEVEL</property>
       <property name="Group Search Filter"></property>
       <property name="Group Name Attribute"></property>
       <property name="Group Member Attribute"></property>
       <property name="Group Member Attribute - Referenced User Attribute"></property>

You can find the usual parameters that you configured for the LDAP authentication part, but there is also a lot of new parameters to only synchronized specific parts of your remote LDAP/AD servers. The documentation says:

‘Url’ – Space-separated list of URLs of the LDAP servers (i.e. ldap://<hostname>:<port>).

‘Page Size’ – Sets the page size when retrieving users and groups. If not specified, no paging is performed.

‘Sync Interval’ – Duration of time between syncing users and groups (i.e. 30 mins). Minimum allowable value is 10 secs.

‘User Search Base’ – Base DN for searching for users (i.e. ou=users,o=nifi). Required to search users.

‘User Object Class’ – Object class for identifying users (i.e. person). Required if searching users.

‘User Search Scope’ – Search scope for searching users (ONE_LEVEL, OBJECT, or SUBTREE). Required if searching users.

‘User Search Filter’ – Filter for searching for users against the ‘User Search Base’ (i.e. (memberof=cn=team1,ou=groups,o=nifi) ). Optional.

‘User Identity Attribute’ – Attribute to use to extract user identity (i.e. cn). Optional. If not set, the entire DN is used.

‘User Group Name Attribute’ – Attribute to use to define group membership (i.e. memberof). Optional. If not set group membership will not be calculated through the users. Will rely on group membership being defined through ‘Group Member Attribute’ if set. The value of this property is the name of the attribute in the user ldap entry that associates them with a group. The value of that user attribute could be a dn or group name for instance. What value is expected is configured in the ‘User Group Name Attribute – Referenced Group Attribute’.

‘User Group Name Attribute – Referenced Group Attribute’ – If blank, the value of the attribute defined in ‘User Group Name Attribute’ is expected to be the full dn of the group. If not blank, this property will define the attribute of the group ldap entry that the value of the attribute defined in ‘User Group Name Attribute’ is referencing (i.e. name). Use of this property requires that ‘Group Search Base’ is also configured.

‘Group Search Base’ – Base DN for searching for groups (i.e. ou=groups,o=nifi). Required to search groups.

‘Group Object Class’ – Object class for identifying groups (i.e. groupOfNames). Required if searching groups.

‘Group Search Scope’ – Search scope for searching groups (ONE_LEVEL, OBJECT, or SUBTREE). Required if searching groups.

‘Group Search Filter’ – Filter for searching for groups against the ‘Group Search Base’. Optional.

‘Group Name Attribute’ – Attribute to use to extract group name (i.e. cn). Optional. If not set, the entire DN is used.

‘Group Member Attribute’ – Attribute to use to define group membership (i.e. member). Optional. If not set group membership will not be calculated through the groups. Will rely on group membership being defined through ‘User Group Name Attribute’ if set. The value of this property is the name of the attribute in the group ldap entry that associates them with a user. The value of that group attribute could be a dn or memberUid for instance. What value is expected is configured in the ‘Group Member Attribute – Referenced User Attribute’. (i.e. member: cn=User 1,ou=users,o=nifi vs. memberUid: user1)

‘Group Member Attribute – Referenced User Attribute’ – If blank, the value of the attribute defined in ‘Group Member Attribute’ is expected to be the full dn of the user. If not blank, this property will define the attribute of the user ldap entry that the value of the attribute defined in ‘Group Member Attribute’ is referencing (i.e. uid). Use of this property requires that ‘User Search Base’ is also configured. (i.e. member: cn=User 1,ou=users,o=nifi vs. memberUid: user1)

NOTE: Any identity mapping rules specified in will also be applied to the user identities. Group names are not mapped.

Please find more information in the documentation here.

If I have to summarize a bit the new authorizers.xml file structure, I could use this image:


Screen Shot 2017-12-22 at 6.25.03 PM

Now we discussed the technical details. Let’s demo it. I’ll re-use Apache Directory Studio to setup a local LDAP server as I did in my article about LDAP authentication with NiFi. I’ll skip the details (please refer to the article if needed) and create the following structure:

Screen Shot 2017-12-22 at 4.20.38 PM.png

In a group, I have:

Screen Shot 2017-12-22 at 4.21.43 PM

And for a user, I have:

Screen Shot 2017-12-22 at 4.22.25 PM

Note that I’m using a very bad hack because, by default, the attribute ‘memberOf’ is not available unless you define additional objectClass. As a workaround, I’m using the ‘title’ attribute to represent the membership of a user to different groups. It’s quick and dirty, but it’ll do for this demo.

Now, here is my authorizers.xml file:

<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
    <property name="Users File">./conf/users.xml</property>
    <property name="Legacy Authorized Users File"></property>
    <property name="Initial User Identity 1"></property>

    <property name="Authentication Strategy">SIMPLE</property>

    <property name="Manager DN">uid=admin,ou=system</property>
    <property name="Manager Password">secret</property>

    <property name="Referral Strategy">FOLLOW</property>
    <property name="Connect Timeout">10 secs</property>
    <property name="Read Timeout">10 secs</property>

    <property name="Url">ldap://localhost:10389</property>
    <property name="Page Size"></property>
    <property name="Sync Interval">30 mins</property>

    <property name="User Search Base">ou=people,dc=nifi,dc=com</property>
    <property name="User Object Class">person</property>
    <property name="User Search Scope">ONE_LEVEL</property>
    <property name="User Search Filter">(title=cn=nifi,ou=groups,dc=nifi,dc=com)</property>
    <property name="User Identity Attribute">cn</property>
    <property name="User Group Name Attribute">title</property>
    <property name="User Group Name Attribute - Referenced Group Attribute"></property>

    <property name="Group Search Base"></property>
    <property name="Group Object Class">group</property>
    <property name="Group Search Scope">ONE_LEVEL</property>
    <property name="Group Search Filter"></property>
    <property name="Group Name Attribute">cn</property>
    <property name="Group Member Attribute"></property>
    <property name="Group Member Attribute - Referenced User Attribute"></property>

    <property name="Configurable User Group Provider">file-user-group-provider</property>
    <property name="User Group Provider 1">ldap-user-group-provider</property>

    <property name="User Group Provider">composite-configurable-user-group-provider</property>
    <property name="Authorizations File">./conf/authorizations.xml</property>
    <property name="Initial Admin Identity">admin</property>
    <property name="Legacy Authorized Users File"></property>
    <property name="Node Identity 1"></property>

    <property name="Access Policy Provider">file-access-policy-provider</property>

In this case, I decide to go through the users defined in my ‘people’ OU, to filter only the users belonging to the ‘nifi’ group and to use the ‘cn’ attribute as the username. I also specify that the ‘title’ attribute is the group membership of a user. This way, NiFi is able to do the mapping between the users and groups. Note that my ‘admin’ user that I defined as my initial admin identity is in my LDAP server, and I don’t need to define it in the File User Group provider definition.

When starting NiFi and connecting to it as the ‘admin’ user, I can go in the Users view and I can find:

Screen Shot 2017-12-22 at 4.37.29 PM

Note that the button to add users and groups is available since I used the Composite Configurable User Group provider and defined the File User Group provider. That’s how I would specify my nodes as users if I don’t want to have the servers in my LDAP/AD.

Also note that this will automatically be synchronized with LDAP/AD based on the “Sync Interval” you specified in the authorizers configuration file.

Finally, as mentioned in the docs, remember that the order is important when using composite providers in case you have users/groups collisions between multiple sources.

With this configuration, I don’t have to care anymore about defining users and groups in NiFi and I can directly create my policies. It’s much more efficient to manage everything in case people are leaving, or changing of projects. Cool, isn’t it?

Let me know if you have any comment/question.

XML data processing with Apache NiFi

Notelook at the new features in NiFi 1.7+ about XML processing in this post

I recently had to work on a NiFi workflow to process millions of XML documents per day. One of the step being the conversion of the XML data into JSON. It raises the question of the performances and I will briefly expose my observations in this post.

The two most natural approaches to convert XML data with Apache NiFi are:

  • Use the TransformXML processor with a XSLT file
  • Use a scripted processor or use a custom Java processor relying on a library

There are few XSLT available on the internet providing a generic way to transform any XML into a JSON document. That’s really convenient and easy to use. However, depending of your use case, you might need specific features.

In my case, I’m processing a lot of XML files based on the same input schema (XSD) and I want the output to be compliant to the same Avro schema (in order to use the record-oriented processors in NiFi). The main issue is to force the generation of an array when you only have one single element in your input.

XSLT approach

Example #1:

      <Text>Some text...</Text>
      <Text>Some text...</Text>

This XML document will be converted into the following JSON:

   "MyDocument" : {
     "MyList" : {
       "MyElement" : [ {
           "Text" : "Some text...",
           "RecordID" : 1
         }, {
           "Text" : "Some text...",
           "RecordID" : 2
         } ]

Example #2:

However, if you have the following XML document:

      <Text>Some text...</Text>

The document will be converted into:

  "MyDocument" : {
    "MyList" : {
      "MyElement" : {
        "Text" : "Some text...",
        "RecordID" : 1

Force array

And here start the problems… because we don’t have the same Avro schema. That is why I recommend using the XSLT file provided by Bram Stein here on Github. It provides a way to force the creation of an array. To do that, you need to insert a tag into your XML input file. The tag to insert is


But for this tag to be correctly interpreted, you also need to specify the corresponding namespace:


In the end, using ReplaceText processors with regular expressions, you need to have the following input (for the example #2):

<MyDocument xmlns:json="">
    <MyElement json:force-array="true">
      <Text>Some text...</Text>

And this will give you:

  "MyDocument" : {
    "MyList" : {
      "MyElement" : [ {
        "Text" : "Some text...",
        "RecordID" : 1
      } ]

And now I do have the same schema describing my JSON documents. Conclusion: you need to use regular expressions to add a namespace in the first tag of your document and add the JSON array tag in every tag wrapping elements that should be part of an array.

Java approach

Now, let’s assume you’re not afraid about using scripted processors or developing your own custom processor. Then it’s really easy to have a processor doing the same using a Java library like org.json (note that library is *NOT* Apache friendly in terms of licensing and that’s why the following code cannot be released with Apache NiFi). Here is an example of custom processor doing the conversion. And here is a Groovy version for the ExecuteScript processor.

What about arrays with this solution? Guess what… It’s kind of similar: you have to use a ReplaceText processor before and after to ensure that arrays are arrays in the JSON output for any number of elements in your input. Also, you might have to do some other transformations like removing the namespaces or replacing empty strings




values (by default, everything will be converted to an empty string although you might want null record instead).

To force arrays, the easiest approach is to double every tag that should be converted into an array. With the example #2, I transform my input to have:

    <MyElement /><MyElement>
      <Text>Some text...</Text>

It’ll give me the following JSON:

  "MyDocument" : {
    "MyList" : {
      "MyElement" : [ "", {
        "Text" : "Some text...",
        "RecordID" : 1
      } ]

And, then, I can use another ReplaceText processor to remove the unwanted empty strings created by the conversion.

Conclusion: with the two approaches you’ll need to be a bit intrusive in your data to get the expected results. What about the performances now?


I remove the ReplaceText processors from the equation as I usually need the same amount of regular expressions work in both cases. I want to only focus on:

I’ll compare the performances of each case using input of different sizes (data generated using a GenerateFlowFile processor) with default configuration (one thread, no change on run duration, etc) on my laptop.

Method: I’m generating as much data as possible (it’s always the same file during a single run) using the GenerateFlowFile processor. I wait at least 5 minutes to have a constant rate of processing and I get the mean on a 5 minutes window of constant processing.

Screen Shot 2017-09-07 at 12.12.12 AM.png

For each run, I’m only running the GenerateFlowFile, one of the three processors I’m benchmarking, and the UpdateAttribute (used to only drop the data).

The input data used for the benchmark is a fairly complex XML document with arrays of arrays, lot of elements in the arrays, deeply nested records, etc. To reduce the size of the input size, I’m not changing the structure but only removing elements in the arrays. In other words: the schema describing the output data remains the same for each run.

Note that the custom Java/Groovy option is loading the full XML document in memory. To process very large XML document, a streaming approach with another library would certainly be better suited.

Here are the results with input data of 5KB, 10KB, 100KB, 500KB and 1000KB. The below graph gives the number of XML files processed per second based on the input size for each solution.

Screen Shot 2017-09-07 at 10.16.45 PM

It’s clear that the custom Java processor is the most efficient one. The XSLT option is really nice when you want to do very specific transformations but it can quickly get slow. Using a generic XSLT file for XML to JSON transformation is easy and convenient but won’t be the most efficient option.

We can also notice that the Groovy option is a little bit less efficient than the Java one, but that’s expected. Nevertheless, the Groovy option provides pretty good performances and does not require building and compiling a custom processor: everything can be done directly from the NiFi UI.

To improve the performances, it’s then possible to play with the “run duration” parameter and increase the number of concurrent tasks. Actually it’s quite easy to reach the I/O limitations of the disks. Using a NiFi cluster and multiple disks for the content repository, it’s really easy to process hundreds of millions of XML documents per day.

If we display the performance ratio based on the file size between the XSLT solution and the Java based solution, we have:

Screen Shot 2017-09-07 at 10.28.46 PM

We can see that with very small files, the processing using Java-based processor is about 13x more efficient than the XSLT approach. But with files over 100KB, the Java solution is about 26x more efficient. That’s because the NiFi framework is doing few things before and after a flow file has been processed. When processing thousands of flow files per second it creates a small overhead that explains the difference.

XML Record Reader

Since few versions, Apache NiFi contains record-oriented processors. It provides very powerful means to process record-oriented data. In particular, it allows users to process batches of data instead of a “per-file” processing. This provides a very robust and high rate processing. While I’m writing this post there is no reader for XML data yet. However there is a JIRA for it and it would provide few interesting features:

  • By using a schema describing the XML data, it’d remove the need to use ReplaceText processors to handle the “array problem”.
  • It’d give the possibility to merge XML documents together to process much more data at once providing even better performances.

This effort can be tracked under NIFI-4366.

As usual, feel free to post any comment/question/feedback.

Monitoring NiFi – Scripted Reporting Task

Note – This article is part of a series discussing subjects around NiFi monitoring.

In the new release of Apache NiFi (1.2.0), you can now develop Scripted Reporting Task thanks to NIFI-1458. It is the same approach as with the ExecuteScript processor for which you have tons of great examples here.

You might also want to read the following posts:

With the ScriptedReportingTask you can define your own implementation of the onTrigger() method and get access to:

  • ReportingContext context (which gives you access to various information such as events, provenance, bulletins, controller services, process groups, etc)
  • VirtualMachineMetrics vmMetrics (to access the metrics of the JVM)
  • ComponentLog log (if you want to log messages)

Let’s start with a very easy example: I want to log the number of threads inside my JVM every minute. Here is my code:"Thread count = " + vmMetrics.daemonThreadCount())

Screen Shot 2017-05-12 at 5.43.45 PM.png

And I can check in my nifi-app.log file that I do have:

2017-05-12 17:43:27,639 INFO [Timer-Driven Process Thread-5] o.a.n.r.script.ScriptedReportingTask ScriptedReportingTask[id=fd1668eb-015b-1000-1974-5ef96e1f9a8b] Thread count = 29

OK… now I won’t go into the details of all the information you can access using the “context” variable but let’s try another example…

I want to send a POST request over HTTP containing a JSON representation of the summary of my root process group.

I’m not really used to Groovy so please excuse my coding style ;-). But here is a working code (available here as well):

def json = Class.forName("javax.json.Json")
def httpClients = Class.forName("org.apache.http.impl.client.HttpClients")
def contentType = Class.forName("org.apache.http.entity.ContentType")

def status = context.getEventAccess().getControllerStatus();
def factory = json.createBuilderFactory(Collections.emptyMap());
def builder = factory.createObjectBuilder();

builder.add("componentId", status.getId());
builder.add("bytesRead", status.getBytesRead());
builder.add("bytesWritten", status.getBytesWritten());
builder.add("bytesReceived", status.getBytesReceived());
builder.add("bytesSent", status.getBytesSent());
builder.add("bytesTransferred", status.getBytesTransferred());
builder.add("flowFilesReceived", status.getFlowFilesReceived());
builder.add("flowFilesSent", status.getFlowFilesSent());
builder.add("flowFilesTransferred", status.getFlowFilesTransferred());
builder.add("inputContentSize", status.getInputContentSize());
builder.add("inputCount", status.getInputCount());
builder.add("outputContentSize", status.getOutputContentSize());
builder.add("outputCount", status.getOutputCount());
builder.add("queuedContentSize", status.getQueuedContentSize());
builder.add("activeThreadCount", status.getActiveThreadCount());
builder.add("queuedCount", status.getQueuedCount());

def requestEntity = new org.apache.http.entity.StringEntity(, contentType.APPLICATION_JSON);
def httpclient = httpClients.createDefault();
def postMethod = new org.apache.http.client.methods.HttpPost("http://localhost:9999/rootStatus");

Note – this should be improved to, for instance, properly handle potential exceptions.

To get this code working, I also have to specify the required dependencies (JSON, Apache HTTP, etc). For that, in the module directory property of the reporting task, I gave the following paths (because I’m lazy, I am pointing to much dependencies than required):

  • /var/lib/nifi/work/nar/extensions/nifi-standard-nar-1.2.0.nar-unpacked/META-INF/bundled-dependencies/
  • /var/lib/nifi/work/nar/extensions/nifi-site-to-site-reporting-nar-1.2.0.nar-unpacked/META-INF/bundled-dependencies/

In my example I’m sending my JSON payload with a POST HTTP request to localhost on port 9999 with the path rootStatus. To receive the request, I started a ListenHttp processor with the following configuration:

Screen Shot 2017-05-12 at 8.00.31 PM.png

Once my reporting task is started, I start receiving the information as flow files:

Screen Shot 2017-05-12 at 8.08.55 PM.png

This scripted reporting task allows you to quickly develop proof of concept to send information to your internal systems using the interfaces you want. However, according to your needs, it might be more interesting to develop your own reporting task in Java and to build the corresponding NAR. It will give you more flexibility/options (you’ll be able to implement more interfaces) and better performances.

As usual feel free to ask questions and comment this post.

Monitoring NiFi – Ambari & Grafana

Note – This article is part of a series discussing subjects around NiFi monitoring.

When using Apache NiFi (note that version 1.2.0 is now released!) as part of HDF, a lot of of things are simplified using Apache Ambari to deploy NiFi and manage its configuration. Also, using Ambari Metrics service and Grafana, you have a way to easily and visually monitor NiFi performances. And you can also use Apache Ranger to centralize the authorizations management for multiple components (NiFi, Kafka, etc) in one single place.

This article will discuss how you can use Ambari Metrics and Grafana to improve your NiFi monitoring. Let’s start with a quick discussion around AMS (Ambari Metrics System). By default this service is running a Metrics Collector with an embedded HBase instance (and a Zookeeper instance) to store all the metrics, and Ambari will also deploy Metrics Monitor instances on all the nodes of the cluster. The monitors will collect the metrics at system level and send the metrics to the collector. However, the collector also exposes a REST API and that’s what NiFi is going to use with the AmbariReportingTask.


Source and documentation is on the Hortonworks website here.

When using HDF, the Ambari Reporting task should be already up and running for you. If not, you can add it and configure it with a frequency of one minute (it does matter) and use the following parameters:

Screen Shot 2017-05-11 at 9.30.46 AM

Note that “ambari.metrics.collector.url” is an environment variable already set for you when Ambari is starting NiFi. You could also directly give the address, in my case:


Once this reporting task is up and running, you should be able to see the metrics on the NiFi service page in Ambari:

Screen Shot 2017-05-11 at 9.38.57 AM.png

Also, you can go into Grafana to display dashboards with the metrics of your components. You have pre-configured dashboards and here is the one for NiFi:

Screen Shot 2017-05-11 at 9.46.03 AM.png

Now, all the metrics we have here are at cluster level. We are not able to display metrics for specific workflows. With the latest release of Apache NiFi (1.2.0), there is now an optional parameter in the AmbariReportingTask to specify a process group ID. This way, by creating a second reporting task (keep the one providing cluster-level metrics) and by specifying the ID of a specific process group, you can actually create your Grafana dashboards at workflow level.

Let’s say I’ve the following workflow:

Screen Shot 2017-05-11 at 9.52.49 AM

And inside my process group, I have:

Screen Shot 2017-05-11 at 9.52.59 AM.png

Now, my process group having the ID “75973b6e-2d38-1cf3-ffff-fffffdea8cbc”, I can define the following Ambari reporting task:

Screen Shot 2017-05-11 at 9.54.50 AM.png

Note – you must keep “nifi” as the Application ID as it has to match the configuration of the Ambari Metrics System.

Once your reporting task is running, in Grafana, you can create your own dashboard for this workflow and display the metrics you want:

Screen Shot 2017-05-11 at 10.08.59 AM.png

For my Kafka example, here is the dashboard I defined:

Screen Shot 2017-05-11 at 10.39.47 AM.png

In this example, I can see that my workflow is running fine but the free disk space on one of my node is decreasing very quickly. It turns out that when my disk is completely filled, back pressure will be enabled in my workflow and there is no more data sent to Kafka. Instead data is queued in NiFi.

This simple example gives me a lot of information:

  • Everything is default configuration in Ambari and I chose my three NiFi nodes to also host Kafka brokers. By default, for Kafka, the replication factor is set to 1, the number of partitions is set to 1 and the automatic creation of topic is allowed (that’s why I didn’t need to create the topic before starting my workflow). Because of the default parameters, all of the data is sent to only one Kafka broker (pvillard-hdf-2) and that’s why the disk space is quickly decreasing on this node since my three NiFi nodes are sending data to this broker.
  • Also, we clearly see that’s not a good idea to collocate NiFi and Kafka on the same nodes since they are both IO intensive. In this case, they are using the same disk… and we can see that the task duration (for NiFi) is clearly higher on the Kafka node that is receiving the data (pvillard-hdf-2). Long story short: keep NiFi and Kafka on separated nodes (or at the very least with different disks).

With HDF and the Ambari Metrics System, it gives you the ability to create custom relevant dashboards for specific use cases. It also allows you to mix information from Kafka, from NiFi and from the hosts to have all the needed information in one single place.

Also, by using the REST API of the Metrics Collector (you may be interested by this article), you could also send your own data (not only the data gathered at the process group level) to add more information into your dashboards. An example that comes in mind would be to send the lineage duration (see Monitoring of Workflow SLA) at the end of the workflow using an InvokeHTTP processor and sending a JSON payload using a POST request to the API endpoint.

Let’s say I want to monitor how long it takes between my GenerateFlowFile and the end of my workflow to check if some particular events are taking longer. Then I could have something like:

Screen Shot 2017-05-11 at 5.58.02 PM.png

What am I doing here? I want to send to AMS the information about the lineage duration of the flow files I sent into my Kafka topic. However I don’t want to send the duration of every single event (that’s not really useful and it’s going to generate a lot of requests/data). Instead I want to make an API call only once per minute. The idea is to compute the mean and max of the lineage duration with a rolling window of one minute and to only send this value to AMS.

I could use the new AttributeRollingWindow processor but it is not as fast as the PublishKafka and I don’t want to generate back pressure in my relationships. So I use the InvokeScriptedProcessor to build my own rolling processor (it’s faster because I am not using any state information):

  • this processor takes a frequency duration as a parameter (that I’ll set to 1 minute in this example)
  • for every flow file coming in, it will extract the lineage start date to compute max and mean lineage duration over the rolling window. If the last flow file sent in the success relationship was less than one minute ago, I’ll route the flow file to drop relationship (that I set to auto-terminated). If it was more than one minute ago, I update the attributes of the current flow file with the mean and max of all the flow files since the last “success” flow file and route this flow file in the success relationship

Since I’ve flow files coming in my processor at a high rate, I know that my processor will release one flow file every minute with the mean and max of the linage duration for the flow files of the last minute.

Then I use a ReplaceText processor to construct the JSON payload that I’ll send to the Metrics Collector using the InvokeHttp processor.

Here is the configuration of the InvokeScriptedProcessor:

Screen Shot 2017-05-11 at 7.54.00 PM

The Groovy script used can be found here.

Then I create the JSON payload with the ReplaceText processor:

Screen Shot 2017-05-11 at 7.56.33 PM.png

Note that I use the ID of the processor as the “instanceid” attribute in the JSON.

Then, I use the InvokeHttp processor (with a scheduling/frequency of 1 minute):

Screen Shot 2017-05-11 at 7.57.45 PM.png

Now, I can use this information to build the corresponding graph in my Grafana dashboard:

Screen Shot 2017-05-11 at 7.59.27 PM

I can see that, in average, it takes about 150 milliseconds to generate my flow file, publish it in my Kafka topic and get it into my scripted processor. I could also generate one metric per host of my cluster to check if a node is performing badly compared to the others.

Now you can easily send your custom data into AMS and create dashboards for your specific use cases and workflows.

As usual feel free to ask questions and comment this post.

Monitoring NiFi – Workflow SLA

Note – This article is part of a series discussing subjects around NiFi monitoring.

Depending of the workflows and use cases, you may want to retrieve some metrics per use-case/workflow instead of high level metrics. One of the things you could be looking after is Workflow SLA.

First of all, we need to agree on what is “Workflow SLA”. In this article, by SLA (Service-Level Agreement) monitoring I mean that I want to ensure that a workflow is processing every single event quickly enough. In other words, if an event took a long time to be processed, I want to be aware of that so I can understand what is going on. A workflow could be perfectly running in NiFi (no bulletin generated, no error) but the processing time could get bigger because of external causes (resources exhaustion, network bandwidth with external systems, abnormal event size, etc).

Luckily for us, every single flow file within NiFi contains two core attributes that are perfectly designed for our needs. From the documentation:

  • entryDate: The date and time at which the FlowFile entered the system (i.e., was created). The value of this attribute is a number that represents the number of milliseconds since midnight, Jan. 1, 1970 (UTC).
  • lineageStartDate: Any time that a FlowFile is cloned, merged, or split, this results in a “child” FlowFile being created. As those children are then cloned, merged, or split, a chain of ancestors is built. This value represents the date and time at which the oldest ancestor entered the system. Another way to think about this is that this attribute represents the latency of the FlowFile through the system. The value is a number that represents the number of milliseconds since midnight, Jan. 1, 1970 (UTC).

In our case we are really interested by the “lineageStartDate” attribute but based on your specific use cases and needs you could also be interested by “entryDate”.

At the end of my workflow I want to monitor, I could use a RouteOnAttribute processor to route the event if and only if the duration since the lineage start date is over a given threshold (let’s say one minute for the below example). And if an event took longer than expected, I’m routing the flow file to a PutEmail processor to send an email and receive a notification.

Screen Shot 2017-05-10 at 3.33.34 PM.png

My RouteOnAttribute configuration:

Screen Shot 2017-05-10 at 3.34.02 PM.png

This way, at a workflow level, you are able to check if the processing time of an event is meeting your requirements. You could obviously follow the same approach to check the processing time at multiple critical points of the workflow.

Note that, instead of sending an email notification, you could perfectly use any other kind of processor to integrate this alert/event with any system you are using on your side.

One last comment, I recommend you to check out the article about the Ambari Reporting Task and Grafana where I’m also discussing Workflow SLA and how to send the metrics to Ambari Metrics System and display the information into a Grafana dashboard.

As usual feel free to ask questions and comment this post.

Monitoring NiFi – Site2Site reporting tasks

Note – This article is part of a series discussing subjects around NiFi monitoring.

If we look at the development documentation about reporting tasks:

So far, we have mentioned little about how to convey to the outside world how NiFi and its components are performing. Is the system able to keep up with the incoming data rate? How much more can the system handle? How much data is processed at the peak time of day versus the least busy time of day?

In order to answer these questions, and many more, NiFi provides a capability for reporting status, statistics, metrics, and monitoring information to external services by means of the ReportingTask interface. ReportingTasks are given access to a host of information to determine how the system is performing.

Out of the box, you have quite a lot of available reporting tasks and, in this article, we are going to focus on few of them (have a look at the other articles to find more about the other reporting tasks).

Before going into the details, I am going to discuss Site To Site related reporting tasks and we need to understand what is Site To Site (S2S):

When sending data from one instance of NiFi to another, there are many different protocols that can be used. The preferred protocol, though, is the NiFi Site-to-Site Protocol. Site-to-Site makes it easy to securely and efficiently transfer data to/from nodes in one NiFi instance or data producing application to nodes in another NiFi instance or other consuming application.

In this case, we are going to use S2S reporting tasks, it means that the reporting tasks are continually running to collect data and to send this data to a remote NiFi instance, but you can use S2S to send data to the instance running the reporting task as well. This way, by using an input port on the canvas, you can actually receive the data generated by the reporting task and use it in a NiFi workflow. That’s really powerful because, now, you can use all the NiFi capabilities to process this data and do whatever you want with it.

Let’s go over some examples!

  • Monitoring bulletins

With the new version of Apache NiFi (1.2.0), you can transform every bulletin into a flow file sent to any NiFi instance using Site-To-Site. This way, as soon as a processor / controller service / reporting task is generating a bulletin this can be converted into a flow file that you can use to feed any system you want.

Let’s configure this reporting task to send the bulletins (as flow files) to an input port called “bulletinMonitoring” and use the flow files to send emails.

First, since my NiFi cluster is secured, I create a StandardSSLContextService in the Controller Services tab of the Controller Settings menu (this way, it can be used by reporting tasks).

Screen Shot 2017-05-10 at 12.28.54 PM.png

Screen Shot 2017-05-10 at 12.28.46 PM.png

Then, I can define my reporting task by adding a new SiteToSiteBulletinReportingTask:

Screen Shot 2017-05-10 at 12.30.55 PM.png

Screen Shot 2017-05-10 at 12.31.29 PM.png

Before starting the task, on my canvas, I have the following:

Screen Shot 2017-05-10 at 12.32.55 PM.png

Note – in a secured environment, you need to set the correct permissions on the components. You need to allow NiFi nodes to receive data via site-to-site on the input port and you also need to grant the correct permissions on the root process group so the nodes are able to see the component, view and modify the data.

I configured my PutEmail processor to send emails using the Gmail SMTP server:

Screen Shot 2017-05-10 at 12.45.13 PM.png

Now, as soon as bulletins are generated I’ll receive a notification by email containing my message with the attributes of the flow file, and there will the bulletins as attachment of my email.

Obviously, instead of sending emails, you could, for example, use some other processors to automatically open tickets in your ticketing system (like JIRA using REST API).

  • Monitoring disk space

Now, using the task we previously set, we can take advantage of the task monitoring disk space. This reporting task will generate warn logs (in the NiFi log file) and bulletins when the disk partition to monitor is used over a custom threshold. In case I want to monitor the Content Repository, I could configure my reporting task as below:

Screen Shot 2017-05-10 at 1.28.24 PM.png

Screen Shot 2017-05-10 at 1.28.34 PM.png

Using the combination of this Reporting Task and the SiteToSiteBulletinReportingTask, I’m able to generate flow files when the disk utilization is reaching a critical point and to receive notifications using all the processors I want.

  • Monitoring memory use

The same approach can be used to monitor the memory utilization within the NiFi JVM using the MonitorMemory reporting task. Have a look at the documentation of this reporting task here.

  • Monitoring back pressure on connections

There is also the SiteToSiteStatusReportingTask that will send details about the NiFi Controller status over Site-to-Site. This can be particularly useful to be notified when some processors are stopped, queues are full (and back pressure is enabled), or to build reports regarding NiFi performances. This reporting task will slightly be improved regarding back pressure (with NIFI-3791). In the meantime, if you want to receive notifications when back pressure is enabled on any connection, here is what you can do (assuming you know the back pressure thresholds):

Screen Shot 2017-05-10 at 2.07.44 PM.png

Screen Shot 2017-05-10 at 2.09.14 PM.png

Note that I configured the task to only send data about the connections but you can receive information for any kind of component.

And I use the following workflow: my input port to receive the flow files with the controller status (containing an array of JSON elements for all of my connections), then I split my array using SplitJson processor, then I use EvaluateJsonPath to extract as attributes the values queuedCount and queuedBytes and then I use a RouteOnAttribute processor to check if one of the two attributes I have is greater or equal than my thresholds, and if that’s the case I send the notification by email.

Screen Shot 2017-05-10 at 2.31.23 PM.png

My RouteOnAttribute configuration:

Screen Shot 2017-05-10 at 3.33.46 PM

Site to site reporting tasks are really useful and there are many ways to use the data they can send for monitoring purpose.

  • SiteToSiteProvenanceReportingTask

Note that you have also a Site2Site reporting task to send all the provenance events over S2S. This can be really useful if you want to send this information to external system.

While monitoring a dataflow, users often need a way to determine what happened to a particular data object (FlowFile). NiFi’s Data Provenance page provides that information. Because NiFi records and indexes data provenance details as objects flow through the system, users may perform searches, conduct troubleshooting and evaluate things like dataflow compliance and optimization in real time. By default, NiFi updates this information every five minutes, but that is configurable.

Besides, with NIFI-3859, this could also be used in a monitoring approach to only look for specific events according to custom parameters. It could be used, for instance, to check how long it takes for an event to go through NiFi and raise alerts in case an event took an unusual duration to be processed (have a look to the next article to see how this can be done differently).

As usual feel free to ask questions and comment this post.