FoD Paris Jan 18′ – NiFi Registry and workflow monitoring with a use case

I got the chance to talk a bit about NiFi during the last Future of Data meetup in Paris, and I wanted to share in a blog what I explained during this event.

First of all, I want to remind you that Apache NiFi 1.5.0, MiNiFi 0.4.0 and NiFi Registry 0.1.0 are out and it represents a huge step forward in the community as it brings flow development life cycle (FDLC) to a completely new level. Besides, this first version of the registry is a solid foundation for what will come in the future.

Ok let’s get back to what I discussed at the meetup.

  • Introduction to NiFi Registry

What is the NiFi Registry? It’s a new component of the NiFi ecosystem, it’s a web-based application that is meant to be used as a central location for storage and management of shared resources across one or more instances of NiFi and/or MiNiFi.

In a typical multi-clusters environment, this could be represented as below:

Screen Shot 2018-02-02 at 2.03.45 PM.png

At the moment, the integration between NiFi and the NiFi Registry allows you to store, retrieve, and upgrade versioned flows. Next versions will allow users to store more different types of resources. And if you’re asking… yes the NiFi Registry can be secured and provides authorization support to control users and access policies to the versioned flows.

Note that, even if you have a single environment, using the NiFi Registry will also make a lot of sense. Firstly, it’s a convenient way to save your work and to version it (and to be honest, this argument alone should convince you), but it’s also a practical way to share common reusable parts of flows between multiple workflows.

For example, let’s say you have a process group designed to perform a very common task like transforming XML data to JSON and adding attributes to the flow files. If you have to update this process groups with few changes and if this process group is used in multiple places, you don’t need to copy/paste your changes everywhere. You just have to version it and as soon as you update it, you’ll be able to update all the other instances of this process group to the latest version.

  • NiFi monitoring NiFi

I already posted some blogs around NiFi monitoring but that’s a recurrent subject and we have quite a lot of options to answer monitoring requirements someone might have.

What do we mean by “monitoring”? In my opinion, we have two main subjects:

  1. Dashboarding: expose various metrics data to measure the application performance and stability.
  2. Alerting: send notifications when something is wrong.

In the context of NiFi, we can also have two types of monitoring:

  1. “Technical” monitoring: is the NiFi service working as expected? are the nodes up and alive?
  2. “Business” monitoring: is a particular workflow for a given project behaving as expected?

For both types of monitoring, my recommendation is to use the reporting tasks available in NiFi. Reporting tasks provide an efficient way to extract and process the data generated by the framework for technical related metrics as well as workflow related metrics.

  • Use case study

I’m going to articulate this blog around a very simple use case that will demonstrate the use of the NiFi Registry to perform FDLC tasks. And I’ll also use it as a baseline to show various ways of monitoring both the NiFi service and a particular workflow.

The use case is to expose an HTTP REST end point using NiFi so that an application can send data through HTTP calls to NiFi. The idea is to route the received data based on the URI used in the HTTP call. For each type of data, I’ll perform XML validation against a schema, XML to JSON conversion, perform simple data transformations and send the data into a Kafka topic.

  • NiFi monitoring: survival kit

Before going into the details of the use case, I’ll give my personal opinion of what should be the minimum setup for efficient NiFi monitoring. Here are the 6 reporting tasks that should be configured:

Screen Shot 2018-02-02 at 3.15.03 PM.png

# The Ambari reporting task is used to send metrics to the Ambari Metrics Service (obviously, this reporting task only makes sense if you have Ambari and AMS available, and, if not, there are similar reporting tasks for different systems). When using the reporting task, the following metrics will be available in Ambari on the NiFi service page:

Screen Shot 2018-02-02 at 3.51.48 PM.png

Things to monitor carefully are:

  1. JVM heap usage to avoid OutOfMemory errors. Even though NiFi is not requiring a lot of RAM, some processors/use cases might need more RAM. It also depends how much flow files NiFi is going to handle at one time.
  2. Active threads is the mean number of active threads per node in the NiFi cluster. By default, each NiFi node will initialize a pool of 10 threads for timer driven components. If you see that this metric is always equal to the size of the thread pool, then you might need to increase the thread pool size. But this number needs to be changed with care in coordination with CPU/load metrics of your NiFi nodes. To change the thread pool size, go into Controller Settings / General (change will be immediately effective, no restart required).
  3. Flow files queued is the number of flow files currently being processed by NiFi. If this graph is continually increasing, it can be a symptom of something going wrong. It can totally be expected to have peaks at some specific hours of the day based on the use cases, and NiFi will protect itself using the backpressure mechanism but, in any case, always good to keep an eye on this metric.
  4. Total task duration can also be something to look at: in case this graph is constantly going higher, it means that it takes more and more time to process the data. It can be expected if volumetry is changing or if workflows are updated but it can also be the symptom that a processor is not performing well.

# The MonitorDiskUsage reporting tasks are useful to be sure that bulletins will be emitted before having a “no space left on device” error message. I recommend setting one per repository (at least 3, but it can be more if you have multiple content/provenance repositories). In my screenshot, I defined one for provenance repository, one for content repository and one for flow file repository.

# The Site to Site bulletin reporting task is used to reingest all the generated bulletins into NiFi. Generally speaking Site to Site reporting tasks are a really powerful tool as it allows you to reingest data generated by NiFi’s framework into NiFi itself so you can use all the processors you want to process this data. With this specific reporting task, you’re able to deal with every generated bulletins into a dedicated workflow and do whatever suits you: email notification, dashboarding, automatic issue creation in JIRA, etc…

# The Site to Site status reporting task is used to frequently take a screenshot of what is going on in all the workflows existing on the canvas. It will generate an JSON array of JSON documents representing the status of every components (processors, ports, relationships, etc) including the last 5-minutes statistics data.

  • NiFi monitoring NiFi

Using strict naming conventions, you can leverage generic workflows dedicated to monitoring and perform quite powerful things. My approach is really to use NiFi as a way to monitor itself. You might ask: what is going to monitor the workflows dedicated to monitoring? To be honest, if there is something wrong going on with the monitoring workflows, you will detect it very quickly through the technical monitoring / Ambari reporting task. Or the NiFi service will just be red in Ambari…

Consequently, my personal recommendation is to ask each project team to respect a strict naming convention and to deliver a workflow designed to fulfill the monitoring requirements for the project workflows. Each workflow will certainly require different notification mechanisms, and different metrics in the dashboards.

At the root level of the canvas, I’ll have one process group per project in order to ensure proper authorization policies. I’ll also have a process group dedicated to monitoring, and two input ports to receive the data sent by reporting tasks (with NIFI-4814, it’ll be possible to only use one):

Screen Shot 2018-02-07 at 2.34.36 PM

In this use case study, I assume that the naming convention is the following: <acronym of the project>_<meaningful component name>

Besides, if the metrics of a component should be captured for dashboarding, the name should be suffixed by _GRAFANA.

With the above assumptions, here is what I have in my monitoring process group:

Screen Shot 2018-02-07 at 2.39.16 PM

The UpdateAttribute is used to add a “type” attribute with the value “bulletins” or “status”. I’ll use this attribute to perform routing at a later stage.

The UpdateRecord is used to populate a “project” field by extracting the acronym from the component name. Here is the schema I use for bulletins, and here is the schema I use for status.

For bulletins:

Screen Shot 2018-02-07 at 2.46.21 PM.png

For status data:

Screen Shot 2018-02-07 at 2.46.41 PM

At this point, I have bulletins and status data and I’m able to link this data to every project running in NiFi. Now I send this data in two process groups: one that is dedicated to technical monitoring (used and managed by the team in charge of NiFi service), and one that is dedicated to business (per-project) monitoring:

Screen Shot 2018-02-07 at 2.49.20 PM

Technical monitoring

In the technical monitoring, it’s really up to you regarding what you want… What I usually do is to send the bulletin data in a tool like AMS, Solr or Elasticsearch to have dedicated dashboards. I also set email notifications when backpressure is enabled in a connection.

For bulletins dashboarding, I slightly change the schema of the data and send it into an Elasticsearch instance:

Screen Shot 2018-02-07 at 2.53.40 PM.png

I then create a dashboard in Grafana:

Screen Shot 2018-02-07 at 3.00.03 PM

If necessary I can use the notification mechanism available in Grafana to get email messages when some thresholds are reached.

With the status data, I decided to design a workflow sending email notifications as soon as backpressure is enabled in a connection:

Screen Shot 2018-02-07 at 9.28.30 PM

To do that, I use the QueryRecord processor to only select records representing connections with backpressure enabled:

Screen Shot 2018-02-07 at 9.30.32 PM.png

Again… with the site to site reporting tasks you are ingesting into NiFi its own data and you can use all the processors you want to process this data and fulfill your requirements.

Business (per-project) monitoring

If we go back to the first level of my monitoring process group, you noticed that I’m also sending both bulletin and status data to another process group for “per-project” monitoring:

Screen Shot 2018-02-07 at 2.49.20 PM

In this process group, here is what I’m doing:

Screen Shot 2018-02-07 at 9.35.11 PM

I’m using a PartitionRecord processor to partition all the data based on the field /project that we computed earlier. This way, each flow file will only contain data with a unique value for the /project field and this value is also extracted as an attribute of the flow files. Then I just have to use a RouteOnAttribute processor to route the data based on the project that generated this data (if a project didn’t respect the naming convention, the project value will not match any project acronym and the data will be dropped, unless you want to process it somehow – to detect teams not respecting the naming convention!). After the RouteOnAttribute, you have one process group per project and that’s this specific piece that should be developed by every project deployed in NiFi. This way, each project receives both bulletins and status related to the project and each project can implement its own workflow to deal with bulletin and status data.

Before presenting what monitoring I’m doing for the use case of this post, I’ll step back a little and present the use case itself and how I leverage the NiFi Registry to deploy my workflow from a development environment to a production environment.

  • Use case presentation

The use case itself is pretty simple and I won’t go too much in the details since it’s not the purpose of this blog. I’m using a HandleHttpRequest processor to listen for HTTP calls on a given port. Then based on what URI has been called by the client I’m routing the flow files to different process group. I’m expecting to receive HTTP POST requests on specific endpoint, each one representing a type of data. The received data is expected to be XML and I’m checking that the data is valid according to an XSD schema. At this point, three options:

  1. if the client requests an endpoint that is not expected, I route the flow file to the HandleHttpResponse processor and return a 404 HTTP code.
  2. if the client posts data on the expected endpoint but the data is not valid, I route the flow file to the HandleHttpResponse processor and return a 400 HTTP code.
  3. if the received data is valid, then I return a 200 HTTP code.

Screen Shot 2018-02-07 at 9.56.24 PM

Screen Shot 2018-02-07 at 9.57.00 PM.png

What I’m doing with valid data in the process group does not really matter and I won’t describe that (this post is already way too long and I’m surprised you’re still reading…).

  • Deployment with NiFi Registry

Let’s try to keep things simple. The NiFi Registry is a web application living outside of NiFi. However, NiFi integrates with the NiFi Registry. This is done by going into the Controller Settings menu where there is a new tab to define the registries to connect with:

Screen Shot 2018-02-07 at 10.01.54 PM.png

With this integration between NiFi and NiFi Registry, you have new icons in the UI you need to get familiar with:

Screen Shot 2018-02-07 at 10.04.10 PM.png

From left to right:

  1. Number of versioned Process Groups that are up to date with the Registry
  2. Number of versioned Process Groups that are locally modified (you can commit your local changes to create a new version of your process group in the Registry)
  3. Number of versioned Process Groups that are stale (there is a more recent version of the Process Group in the Registry and you can update your local process group to this version)
  4. Number of versioned Process Groups that are stale and also with local changes
  5. Number of versioned Process Groups with a synchronization failure (the Registry is not available / unreachable, etc)

The interactions with the Registry through the NiFi UI can be done by right clicking on a process group or on the canvas inside a process group: this will show a Version menu with available commands.

Screen Shot 2018-02-07 at 10.09.36 PM.png

On the NiFi Registry side, here is what is looks like:

Screen Shot 2018-02-07 at 10.12.33 PM.png

In the Registry, it’s possible to create buckets (you’ll usually have one bucket per project). Then, from the NiFi UI, you’ll be able to start versioning a workflow in a bucket. To do that: right click on the process group, Version menu, start version control.

Screen Shot 2018-02-07 at 10.14.44 PM.png

Then you’ll be able to choose the bucket and to give some information about your workflow:

Screen Shot 2018-02-07 at 10.15.18 PM

You now have your workflow versioned in the registry !

Screen Shot 2018-02-07 at 10.16.39 PM

Now… how do I deploy my workflow on the production cluster?

I just have to drag and drop a process group component on the canvas, and I’ll see:

Screen Shot 2018-02-07 at 10.27.14 PM.png

And I can just click on Import to select the bucket, workflow and version I want:

Screen Shot 2018-02-07 at 10.28.26 PM

I now have my workflow imported on the production cluster. Two things to do:

  1. Update the variables of the process group that are used to externalize environment-specific properties of the workflow components. Note that changes to the variables value are not considered as local changes from Registry point of view because two instances of the same workflow version will have different values for different environments.
  2. Start the controller services and processors. When importing a new workflow for the first time, everything is stopped to let users change the variables. When updating an existing workflow to a new version, only the new components will need to be started (no service interruption).

To access variables, you can right click on the process group and click on the Variables menu:

Screen Shot 2018-02-07 at 10.46.10 PM.png

And then you can update the variables with values corresponding to your environment:

Screen Shot 2018-02-07 at 10.46.20 PM.png

If you have multi-levels process groups and if you defined variables at different levels, you will also need to update variables in sub-process groups.

  • Business monitoring for my use case

If you paid attention, in my screenshot of the NiFi Registry, you probably noticed two versioned workflows in the bucket dedicated to my use case. The other workflow is the one dedicated to the monitoring that you also noticed in a previous screenshot:

Screen Shot 2018-02-07 at 9.35.11 PM

Let’s quickly discuss what I suggest for project-specific monitoring. The first thing you might want to do is to receive email notifications when bulletins are generated by components of your workflow.

Then, what I usually suggest for per-project dashboarding is to export the statistics of specific points of the workflow to an external tool and create dashboards. As I said before, in my example, I’m exporting to the Ambari Metrics Service the last 5-minutes count and bytes sum of the flow files going through connections with a name suffixed with _GRAFANA. In my example, I renamed connections to automatically export statistics to get the number of HTTP code 200, 400 and 404:

Screen Shot 2018-02-07 at 10.40.04 PM.png

And here is my monitoring workflow for this specific use case:

Screen Shot 2018-02-07 at 10.42.43 PM.png

I route the monitoring data to have a specific processing for bulletins (email notifications in this case) and a specific processing for status. First, I use a PartitionRecord processor and a RouteOnAttribute to only extract status data for connections (I ignore data about processors, process groups, ports, etc).

Screen Shot 2018-02-07 at 10.45.39 PM.png

Then I split my JSON arrays and extract the field containing the name of the connection to only keep the connections suffixed by _GRAFANA. Then I transform the content to match the format expected by AMS and send the request to AMS (more details here).

And… that’s it… I’m now able to create Grafana dashboards for my specific use case:

Screen Shot 2018-02-07 at 10.53.06 PM.png

Another thing you might want to do is to define a dedicated Ambari Reporting Task with the UUID of the process group dedicated to the project. This way you have additional statistics about this specific process group: number of active threads, number of flow files queued, volume of data received, etc.

  • What about workflow update?

Ok… so I deployed my workflows in production and have great ways to monitor things. What if I’ve a new endpoint to handle in my workflow. Really simple! So far I only process “purchase” data, let’s add another process group to deal with “customer” data:

Screen Shot 2018-02-07 at 11.03.15 PM

I updated my RouteOnAttribute to accept a new endpoint and added a process group to process this data. I added my HTTP 200 & 400 relationships for this new type of data and I carefully renamed the output connections to automatically get this new data into my dashboards.

I can now see that my process group does have local changes:

Screen Shot 2018-02-07 at 11.06.34 PM.png

And I can commit the changes as a new version of my workflow:

Screen Shot 2018-02-07 at 11.07.19 PM.png

Screen Shot 2018-02-07 at 11.07.40 PM

I can now check that, on production’s side, there is a new version available and I can update to this new version:

Screen Shot 2018-02-07 at 11.10.20 PM.png

I can right click on the process group and update to the latest version:

Screen Shot 2018-02-07 at 11.11.23 PM.png

I select the version I want:

Screen Shot 2018-02-07 at 11.11.33 PM

And that’s it! Note that the update from a version A to a version B will stop and restart existing components if and only if modifications have been done on the components. In my case, the HandleHttpRequest has not been modified… consequently, the update of the workflow will not cause any service interruption. That’s pretty cool!

Also note that the update will not cause any data loss in case the update includes the deletion of connections that are currently containing flow files.

And finally, I do have to update, if necessary, the variables of my newly added process group (for customer data) and to start it.

  • Conclusion

I’ve already talked about way too much things, but I hope it’ll get you excited about the NiFi Registry! Please install it and use it, that’s really going to ease your life. As always I invite you to join the Apache NiFi community by subscribing to mailing lists, submitting JIRAs for NiFi, MiNiFi Java/C++, NiFi Registry, and contribute code on NiFi, MiNiFi Java/C++ and NiFi Registry.

Feel free to comment and ask your questions on this post! Thanks for reading me!

Authorizations with LDAP synchronization in Apache NiFi 1.4+

With the release of Apache NiFi 1.4.0, quite a lot of new features are available. One of it is the improved management of the users and groups. Until this release, it was possible to configure a LDAP (or Active Directory) server but it was only used during the authentication process. Once authenticated it was necessary to have explicit policies for this user to access NiFi resources. And to create a policy for a given user, it was first necessary to manually create this user in NiFi users/groups management view. This time is now over. Users/groups management is now greatly simplified in terms of lifecycle management.

In addition to that, if you are using Apache Ranger as the external authorizer system for NiFi, you can now define rules based on LDAP groups. Before, you had to configure, in Ranger, rules explicitly based on users.

In this article, we are going to discuss how this is actually working and how you can configure it.

If you’re interested by the technical details of the implementation, you can look at the corresponding JIRAs (NIFI-4032, NIFI-4059, NIFI-4127) and Github pull requests (#1923, #1978, #2019).

Basically, the authorizer mechanism evolved quite a bit. Before NiFi 1.4, the authorizers.xml was containing a list of configurations for any authorizer implementation you wanted to use to manage policies in NiFi. Unless you developed your own implementations, you had the choice between the FileAuthorizer (default implementation that stores the policies in a local file) and the RangerNiFiAuthorizer to user Apache Ranger as the external mechanism managing the policies.

If using the FileAuthorizer, the configuration was looking like (in a single node installation):

    <authorizer>
        <identifier>file-provider</identifier>
        <class>org.apache.nifi.authorization.FileAuthorizer</class>
        <property name="Authorizations File">./conf/authorizations.xml</property>
        <property name="Users File">./conf/users.xml</property>
        <property name="Initial Admin Identity”>admin</property>
        <property name="Legacy Authorized Users File"></property>
    </authorizer>

And we set the corresponding property in the nifi.properties file:

nifi.security.user.authorizer=file-provider

Starting with NiFi 1.4, the authorizers.xml file provides much more functionalities (note that the changes are backward compatible and do not require any change from your side if you don’t want to change it).

Let’s start by the new implementation of the authorizer: the Standard Managed Authorizer.

Note – there is also a new Managed Ranger Authorizer, but I won’t go into the details of this implementation in this blog. This implementation gives you the possibility to use Apache Ranger as the external system managing the authorizations but you still have access to the policies in the NiFi UI, and you can also manage additional users. It’s also this implementation that allows you to define group-based policies in Ranger.

It’s configured as below:

    <authorizer>
        <identifier>managed-authorizer</identifier>
        <class>org.apache.nifi.authorization.StandardManagedAuthorizer</class>
        <property name="Access Policy Provider">file-access-policy-provider</property>
    </authorizer>

This new implementation expects the identifier of the Access Policy Provider implementation you want to use. This new abstraction will be used to access and manage users, groups and policies… and to enforce policies when dealing with requesting access to NiFi resources. In the above example, our authorizer is identified with name “managed-authorizer”, and that’s what you need to set in nifi.properties to user it:

nifi.security.user.authorizer=managed-authorizer

You can see that this authorizer expects a property Access Policy Provider with the identifier of the provider you want to use… Let’s move on to the Access Policy Provider. For now, there is a single implementation which is the FileAccessPolicyProvider. If you already know about the previous FileAuthorizer, you shouldn’t be very surprised by the expected properties. Here is a configuration example:

    <accessPolicyProvider>
        <identifier>file-access-policy-provider</identifier>
        <class>org.apache.nifi.authorization.FileAccessPolicyProvider</class>
        <property name="User Group Provider">file-user-group-provider</property>
        <property name="Authorizations File">./conf/authorizations.xml</property>
        <property name="Initial Admin Identity"></property>
        <property name="Legacy Authorized Users File"></property>

        <property name="Node Identity 1"></property>
    </accessPolicyProvider>

Note: as you can see the identifier of this Access Policy Provider is “file-access-policy-provider”, and that’s what we referenced in the property of the authorizer (see above).

As with the FileAuthorizer, you have the Initial Admin Identity property which lets you configure the identity of the user with the admin permissions to set the first policies after a fresh install of NiFi. As the documentation says:

Initial Admin Identity – The identity of an initial admin user that will be granted access to the UI and given the ability to create additional users, groups, and policies. The value of this property could be a DN when using certificates or LDAP, or a Kerberos principal. This property will only be used when there are no other policies defined. If this property is specified then a Legacy Authorized Users File cannot be specified.
NOTE: Any identity mapping rules specified in nifi.properties will also be applied to the initial admin identity, so the value should be the unmapped identity. This identity must be found in the configured User Group Provider.

Then you still have the Legacy Authorized Users File property in case you are upgrading from a NiFi 0.x install and you want to keep your previous policies in place.

You have the Authorizations File property that defines the path to the file that will locally store all the policies. You also find the Node Identity properties in case you are in a NiFi cluster. Nothing changed on this side, but just in case, a quick reminder from the official documentation:

Node Identity [unique key] – The identity of a NiFi cluster node. When clustered, a property for each node should be defined, so that every node knows about every other node. If not clustered these properties can be ignored. The name of each property must be unique, for example for a three nodes cluster: “Node Identity A”, “Node Identity B”, “Node Identity C” or “Node Identity 1”, “Node Identity 2”, “Node Identity 3”.
NOTE: Any identity mapping rules specified in nifi.properties will also be applied to the node identities, so the values should be the unmapped identities (i.e. full DN from a certificate). This identity must be found in the configured User Group Provider.

OK… now we have a new property called “User Group Provider” and that’s where we’re going to specify the identifier of the User Group Provider to be used. This User Group Provider is a new abstraction allowing you to define how users and groups should be automatically retrieved to then define policies on them.

You have multiple implementations available:
  • CompositeUserGroupProvider
  • CompositeConfigurableUserGroupProvider
  • LdapUserGroupProvider
  • FileUserGroupProvider

As the name suggests, the CompositeUserGroupProvider implementation allows you to use at the same time multiple implementations of the User Group Provider. This is very useful, mainly because when using NiFi in clustering mode, you need to define some policies for the nodes belonging to the cluster. And, as you may know, in NiFi, nodes are considered as users. In case your nodes are not defined in your LDAP or Active Directory, you will certainly want to use the composite implementation.

Now you need to consider the CompositeConfigurableUserGroupProvider implementation which is the one you will certainly want to use in most cases. This implementation will also provide support for retrieving users and groups from multiple sources. But the huge difference is that this implementation expects a single configurable user group provider. It means that users and groups from the configurable user group provider are configurable from the UI (as you did when creating users/groups from NiFi UI in previous versions). However, users/groups loaded from one of the other User Group Providers will not be.

Note that it’s up to each User Group provider implementation to define if it is configurable or not. For instance, the LDAP User Group Provider is not configurable: NiFi is not going to manage users and groups in the LDAP/AD server.

A typical configuration will be the definition of the Composite Configurable User Group provider with the File User Group provider as the configurable instance and one instance of the LDAP User Group provider:

    <userGroupProvider>
       <identifier>composite-configurable-user-group-provider</identifier>
       <class>org.apache.nifi.authorization.CompositeConfigurableUserGroupProvider</class>
       <property name="Configurable User Group Provider">file-user-group-provider</property>
       <property name="User Group Provider 1">ldap-user-group-provider</property>
    </userGroupProvider>

In this case, in the definition of the access policy provider, we need to change the property to use the correct user group provider:

    <accessPolicyProvider>
        <identifier>file-access-policy-provider</identifier>
        <class>org.apache.nifi.authorization.FileAccessPolicyProvider</class>
        <property name="User Group Provider">composite-configurable-user-group-provider</property>
        <property name="Authorizations File">./conf/authorizations.xml</property>
        <property name="Initial Admin Identity"></property>
        <property name="Legacy Authorized Users File"></property>

        <property name="Node Identity 1"></property>
    </accessPolicyProvider>

Now, let’s look at the File User Group provider. The objective of this provider is to provide the same functionalities as before: the user can manage users and groups from the UI and everything is stored locally in a file. Configuration looks like:

    <userGroupProvider>
       <identifier>file-user-group-provider</identifier>
       <class>org.apache.nifi.authorization.FileUserGroupProvider</class>
       <property name="Users File">./conf/users.xml</property>
       <property name="Legacy Authorized Users File"></property>

       <property name="Initial User Identity 1"></property>
    </userGroupProvider>

The initial user identities are users that should automatically populated when creating the users.xml file for the first time. Typically you would define here your initial admin identity (if this user is not defined via the LDAP user group provider). From the documentation:

Initial User Identity [unique key] – The identity of a users and systems to seed the Users File. The name of each property must be unique, for example: “Initial User Identity A”, “Initial User Identity B”, “Initial User Identity C” or “Initial User Identity 1”, “Initial User Identity 2”, “Initial User Identity 3”.

NOTE: Any identity mapping rules specified in nifi.properties will also be applied to the user identities, so the values should be the unmapped identities (i.e. full DN from a certificate).

OK… now let’s move to the last user group provider: the one allowing an automatic synchronisation of your users and groups with a LDAP/AD server. Here is the configuration part:

    <userGroupProvider>
       <identifier>ldap-user-group-provider</identifier>
       <class>org.apache.nifi.ldap.tenants.LdapUserGroupProvider</class>
       <property name="Authentication Strategy">START_TLS</property>

       <property name="Manager DN"></property>
       <property name="Manager Password"></property>

       <property name="TLS - Keystore"></property>
       <property name="TLS - Keystore Password"></property>
       <property name="TLS - Keystore Type"></property>
       <property name="TLS - Truststore"></property>
       <property name="TLS - Truststore Password"></property>
       <property name="TLS - Truststore Type"></property>
       <property name="TLS - Client Auth"></property>
       <property name="TLS - Protocol"></property>
       <property name="TLS - Shutdown Gracefully"></property>

       <property name="Referral Strategy">FOLLOW</property>
       <property name="Connect Timeout">10 secs</property>
       <property name="Read Timeout">10 secs</property>

       <property name="Url"></property>
       <property name="Page Size"></property>
       <property name="Sync Interval">30 mins</property>

       <property name="User Search Base"></property>
       <property name="User Object Class">person</property>
       <property name="User Search Scope">ONE_LEVEL</property>
       <property name="User Search Filter"></property>
       <property name="User Identity Attribute"></property>
       <property name="User Group Name Attribute"></property>
       <property name="User Group Name Attribute - Referenced Group Attribute"></property>

       <property name="Group Search Base"></property>
       <property name="Group Object Class">group</property>
       <property name="Group Search Scope">ONE_LEVEL</property>
       <property name="Group Search Filter"></property>
       <property name="Group Name Attribute"></property>
       <property name="Group Member Attribute"></property>
       <property name="Group Member Attribute - Referenced User Attribute"></property>
    </userGroupProvider>

You can find the usual parameters that you configured for the LDAP authentication part, but there is also a lot of new parameters to only synchronized specific parts of your remote LDAP/AD servers. The documentation says:

‘Url’ – Space-separated list of URLs of the LDAP servers (i.e. ldap://<hostname>:<port>).

‘Page Size’ – Sets the page size when retrieving users and groups. If not specified, no paging is performed.

‘Sync Interval’ – Duration of time between syncing users and groups (i.e. 30 mins). Minimum allowable value is 10 secs.

‘User Search Base’ – Base DN for searching for users (i.e. ou=users,o=nifi). Required to search users.

‘User Object Class’ – Object class for identifying users (i.e. person). Required if searching users.

‘User Search Scope’ – Search scope for searching users (ONE_LEVEL, OBJECT, or SUBTREE). Required if searching users.

‘User Search Filter’ – Filter for searching for users against the ‘User Search Base’ (i.e. (memberof=cn=team1,ou=groups,o=nifi) ). Optional.

‘User Identity Attribute’ – Attribute to use to extract user identity (i.e. cn). Optional. If not set, the entire DN is used.

‘User Group Name Attribute’ – Attribute to use to define group membership (i.e. memberof). Optional. If not set group membership will not be calculated through the users. Will rely on group membership being defined through ‘Group Member Attribute’ if set. The value of this property is the name of the attribute in the user ldap entry that associates them with a group. The value of that user attribute could be a dn or group name for instance. What value is expected is configured in the ‘User Group Name Attribute – Referenced Group Attribute’.

‘User Group Name Attribute – Referenced Group Attribute’ – If blank, the value of the attribute defined in ‘User Group Name Attribute’ is expected to be the full dn of the group. If not blank, this property will define the attribute of the group ldap entry that the value of the attribute defined in ‘User Group Name Attribute’ is referencing (i.e. name). Use of this property requires that ‘Group Search Base’ is also configured.

‘Group Search Base’ – Base DN for searching for groups (i.e. ou=groups,o=nifi). Required to search groups.

‘Group Object Class’ – Object class for identifying groups (i.e. groupOfNames). Required if searching groups.

‘Group Search Scope’ – Search scope for searching groups (ONE_LEVEL, OBJECT, or SUBTREE). Required if searching groups.

‘Group Search Filter’ – Filter for searching for groups against the ‘Group Search Base’. Optional.

‘Group Name Attribute’ – Attribute to use to extract group name (i.e. cn). Optional. If not set, the entire DN is used.

‘Group Member Attribute’ – Attribute to use to define group membership (i.e. member). Optional. If not set group membership will not be calculated through the groups. Will rely on group membership being defined through ‘User Group Name Attribute’ if set. The value of this property is the name of the attribute in the group ldap entry that associates them with a user. The value of that group attribute could be a dn or memberUid for instance. What value is expected is configured in the ‘Group Member Attribute – Referenced User Attribute’. (i.e. member: cn=User 1,ou=users,o=nifi vs. memberUid: user1)

‘Group Member Attribute – Referenced User Attribute’ – If blank, the value of the attribute defined in ‘Group Member Attribute’ is expected to be the full dn of the user. If not blank, this property will define the attribute of the user ldap entry that the value of the attribute defined in ‘Group Member Attribute’ is referencing (i.e. uid). Use of this property requires that ‘User Search Base’ is also configured. (i.e. member: cn=User 1,ou=users,o=nifi vs. memberUid: user1)

NOTE: Any identity mapping rules specified in nifi.properties will also be applied to the user identities. Group names are not mapped.

Please find more information in the documentation here.

If I have to summarize a bit the new authorizers.xml file structure, I could use this image:

 

Screen Shot 2017-12-22 at 6.25.03 PM

Now we discussed the technical details. Let’s demo it. I’ll re-use Apache Directory Studio to setup a local LDAP server as I did in my article about LDAP authentication with NiFi. I’ll skip the details (please refer to the article if needed) and create the following structure:

Screen Shot 2017-12-22 at 4.20.38 PM.png

In a group, I have:

Screen Shot 2017-12-22 at 4.21.43 PM

And for a user, I have:

Screen Shot 2017-12-22 at 4.22.25 PM

Note that I’m using a very bad hack because, by default, the attribute ‘memberOf’ is not available unless you define additional objectClass. As a workaround, I’m using the ‘title’ attribute to represent the membership of a user to different groups. It’s quick and dirty, but it’ll do for this demo.

Now, here is my authorizers.xml file:

<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<authorizers>
  <userGroupProvider>
    <identifier>file-user-group-provider</identifier>
    <class>org.apache.nifi.authorization.FileUserGroupProvider</class>
    <property name="Users File">./conf/users.xml</property>
    <property name="Legacy Authorized Users File"></property>
    <property name="Initial User Identity 1"></property>
  </userGroupProvider>

  <userGroupProvider>
    <identifier>ldap-user-group-provider</identifier>
    <class>org.apache.nifi.ldap.tenants.LdapUserGroupProvider</class>
    <property name="Authentication Strategy">SIMPLE</property>

    <property name="Manager DN">uid=admin,ou=system</property>
    <property name="Manager Password">secret</property>

    <property name="Referral Strategy">FOLLOW</property>
    <property name="Connect Timeout">10 secs</property>
    <property name="Read Timeout">10 secs</property>

    <property name="Url">ldap://localhost:10389</property>
    <property name="Page Size"></property>
    <property name="Sync Interval">30 mins</property>

    <property name="User Search Base">ou=people,dc=nifi,dc=com</property>
    <property name="User Object Class">person</property>
    <property name="User Search Scope">ONE_LEVEL</property>
    <property name="User Search Filter">(title=cn=nifi,ou=groups,dc=nifi,dc=com)</property>
    <property name="User Identity Attribute">cn</property>
    <property name="User Group Name Attribute">title</property>
    <property name="User Group Name Attribute - Referenced Group Attribute"></property>

    <property name="Group Search Base"></property>
    <property name="Group Object Class">group</property>
    <property name="Group Search Scope">ONE_LEVEL</property>
    <property name="Group Search Filter"></property>
    <property name="Group Name Attribute">cn</property>
    <property name="Group Member Attribute"></property>
    <property name="Group Member Attribute - Referenced User Attribute"></property>
  </userGroupProvider>

  <userGroupProvider>
    <identifier>composite-configurable-user-group-provider</identifier>
    <class>org.apache.nifi.authorization.CompositeConfigurableUserGroupProvider</class>
    <property name="Configurable User Group Provider">file-user-group-provider</property>
    <property name="User Group Provider 1">ldap-user-group-provider</property>
  </userGroupProvider>

  <accessPolicyProvider>
    <identifier>file-access-policy-provider</identifier>
    <class>org.apache.nifi.authorization.FileAccessPolicyProvider</class>
    <property name="User Group Provider">composite-configurable-user-group-provider</property>
    <property name="Authorizations File">./conf/authorizations.xml</property>
    <property name="Initial Admin Identity">admin</property>
    <property name="Legacy Authorized Users File"></property>
    <property name="Node Identity 1"></property>
 </accessPolicyProvider>

  <authorizer>
    <identifier>managed-authorizer</identifier>
    <class>org.apache.nifi.authorization.StandardManagedAuthorizer</class>
    <property name="Access Policy Provider">file-access-policy-provider</property>
  </authorizer>
</authorizers>

In this case, I decide to go through the users defined in my ‘people’ OU, to filter only the users belonging to the ‘nifi’ group and to use the ‘cn’ attribute as the username. I also specify that the ‘title’ attribute is the group membership of a user. This way, NiFi is able to do the mapping between the users and groups. Note that my ‘admin’ user that I defined as my initial admin identity is in my LDAP server, and I don’t need to define it in the File User Group provider definition.

When starting NiFi and connecting to it as the ‘admin’ user, I can go in the Users view and I can find:

Screen Shot 2017-12-22 at 4.37.29 PM

Note that the button to add users and groups is available since I used the Composite Configurable User Group provider and defined the File User Group provider. That’s how I would specify my nodes as users if I don’t want to have the servers in my LDAP/AD.

Also note that this will automatically be synchronized with LDAP/AD based on the “Sync Interval” you specified in the authorizers configuration file.

Finally, as mentioned in the docs, remember that the order is important when using composite providers in case you have users/groups collisions between multiple sources.

With this configuration, I don’t have to care anymore about defining users and groups in NiFi and I can directly create my policies. It’s much more efficient to manage everything in case people are leaving, or changing of projects. Cool, isn’t it?

Let me know if you have any comment/question.

Monitoring NiFi – Workflow SLA

Note – This article is part of a series discussing subjects around NiFi monitoring.

Depending of the workflows and use cases, you may want to retrieve some metrics per use-case/workflow instead of high level metrics. One of the things you could be looking after is Workflow SLA.

First of all, we need to agree on what is “Workflow SLA”. In this article, by SLA (Service-Level Agreement) monitoring I mean that I want to ensure that a workflow is processing every single event quickly enough. In other words, if an event took a long time to be processed, I want to be aware of that so I can understand what is going on. A workflow could be perfectly running in NiFi (no bulletin generated, no error) but the processing time could get bigger because of external causes (resources exhaustion, network bandwidth with external systems, abnormal event size, etc).

Luckily for us, every single flow file within NiFi contains two core attributes that are perfectly designed for our needs. From the documentation:

  • entryDate: The date and time at which the FlowFile entered the system (i.e., was created). The value of this attribute is a number that represents the number of milliseconds since midnight, Jan. 1, 1970 (UTC).
  • lineageStartDate: Any time that a FlowFile is cloned, merged, or split, this results in a “child” FlowFile being created. As those children are then cloned, merged, or split, a chain of ancestors is built. This value represents the date and time at which the oldest ancestor entered the system. Another way to think about this is that this attribute represents the latency of the FlowFile through the system. The value is a number that represents the number of milliseconds since midnight, Jan. 1, 1970 (UTC).

In our case we are really interested by the “lineageStartDate” attribute but based on your specific use cases and needs you could also be interested by “entryDate”.

At the end of my workflow I want to monitor, I could use a RouteOnAttribute processor to route the event if and only if the duration since the lineage start date is over a given threshold (let’s say one minute for the below example). And if an event took longer than expected, I’m routing the flow file to a PutEmail processor to send an email and receive a notification.

Screen Shot 2017-05-10 at 3.33.34 PM.png

My RouteOnAttribute configuration:

Screen Shot 2017-05-10 at 3.34.02 PM.png

This way, at a workflow level, you are able to check if the processing time of an event is meeting your requirements. You could obviously follow the same approach to check the processing time at multiple critical points of the workflow.

Note that, instead of sending an email notification, you could perfectly use any other kind of processor to integrate this alert/event with any system you are using on your side.

One last comment, I recommend you to check out the article about the Ambari Reporting Task and Grafana where I’m also discussing Workflow SLA and how to send the metrics to Ambari Metrics System and display the information into a Grafana dashboard.

As usual feel free to ask questions and comment this post.

Monitoring NiFi – Logback configuration

Note – This article is part of a series discussing subjects around NiFi monitoring.

There is one configuration file in NiFi that manages all the logging operations performed by NiFi: this file is in the configuration directory (NIFI_HOME/conf) and is named logback.xml. It is good to know that this file can be modified “on-the-fly” and it won’t be required to restart NiFi for the modifications to be taken into account.

This file is a common configuration file for the logback library which is a successor of the famous log4j project. Here is the official website. I won’t get into the details of logback itself (documentation is here) but here is a quick overview of the default configuration when using NiFi.

The most important log file in NiFi is certainly nifi-app.log which is created according to this configuration block:

    <appender name="APP_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <file>${org.apache.nifi.bootstrap.config.log.dir}/nifi-app.log</file>
        <rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
            <fileNamePattern>${org.apache.nifi.bootstrap.config.log.dir}/nifi-app_%d{yyyy-MM-dd_HH}.%i.log</fileNamePattern>
            <maxFileSize>100MB</maxFileSize>
            <maxHistory>30</maxHistory>
        </rollingPolicy>
        <immediateFlush>true</immediateFlush>
        <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
            <pattern>%date %level [%thread] %logger{40} %msg%n</pattern>
        </encoder>
    </appender>

    <root level="INFO">
        <appender-ref ref="APP_FILE"/>
    </root>

In this case, log file is rolling every hour or when the file is going over 100MB. Besides we only keep up to 30 files worth of history. Based on your retention policies you can update this file to meet your requirements.

Also note that, by default, you will have the following log files:

  • ./logs/nifi-app.log
  • ./logs/nifi-bootstrap.log
  • ./logs/nifi-user.log

Now… what can we do to use this file for monitoring purpose? Some options here:

  • We can use the TailFile processor of NiFi to let NiFi tails its own log file and perform the required operation when some conditions are met. Example to send an HTTP request when logs of WARN and ERROR levels are detected:

Screen Shot 2017-05-02 at 9.05.14 PM.png

  • We can also define new appenders in the log configuration file and change it according to our needs. In particular, we could be interested by the SMTP Appender that can send logs via emails based on quite a large set of conditions. Full documentation here.
  • Obviously you can also configure this configuration file so that NiFi log files integrate with your existing systems. An idea could be to configure a Syslog appender to also redirect the logs to an external system.

Few remarks regarding the logs as I see recurrent questions:

  • At the moment, there is no option to have a log file per processor or per process group. It is discussed by NIFI-3065. However such an approach could have performance implications since each log message would need to be routed to the correct log file.
  • Also, at the moment, the custom name of the processor is not displayed in the log messages (only the type is used). It is discussed by NIFI-3877. Right now it looks like:

2017-05-12 10:43:37,780 INFO [Timer-Driven Process Thread-4] o.a.nifi.processors.standard.InvokeHTTP InvokeHTTP[id=f7ebb153-015b-1000-f590-599786e16340] my log message…

This could be a solution in a multi-tenant environment (to get one log file per workflow/use case) assuming each “team” is following the same convention to name the components in the workflows.

As usual feel free to ask questions and comment this post.

Monitoring NiFi – Bootstrap notifier

Note – This article is part of a series discussing subjects around NiFi monitoring.

When NiFi is running, there is more than one process running on the server. Let’s have a look:

[root@pvillard-hdf-2 ~]# ps -ef | grep nifi
nifi     19380     1  0 12:33 ?        00:00:00 /bin/sh /usr/hdf/current/nifi/bin/nifi.sh start
nifi     19382 19380  0 12:33 ?        00:00:06 /usr/java/jdk1.8.0_131//bin/java -cp /usr/hdf/current/nifi/conf:/usr/hdf/current/nifi/lib/bootstrap/* -Xms12m -Xmx24m -Dorg.apache.nifi.bootstrap.config.log.dir=/var/log/nifi -Dorg.apache.nifi.bootstrap.config.pid.dir=/var/run/nifi -Dorg.apache.nifi.bootstrap.config.file=/usr/hdf/current/nifi/conf/bootstrap.conf org.apache.nifi.bootstrap.RunNiFi start
nifi     19419 19382  7 12:33 ?        00:07:36 /usr/java/jdk1.8.0_131/bin/java -classpath /usr/hdf/current/nifi/conf:/usr/hdf/current/nifi/lib/jcl-over-slf4j-1.7.12.jar:/usr/hdf/current/nifi/lib/jul-to-slf4j-1.7.12.jar:/usr/hdf/current/nifi/lib/log4j-over-slf4j-1.7.12.jar:/usr/hdf/current/nifi/lib/logback-classic-1.1.3.jar:/usr/hdf/current/nifi/lib/logback-core-1.1.3.jar:/usr/hdf/current/nifi/lib/nifi-runtime-1.1.0.2.1.2.0-10.jar:/usr/hdf/current/nifi/lib/nifi-api-1.1.0.2.1.2.0-10.jar:/usr/hdf/current/nifi/lib/nifi-documentation-1.1.0.2.1.2.0-10.jar:/usr/hdf/current/nifi/lib/nifi-framework-api-1.1.0.2.1.2.0-10.jar:/usr/hdf/current/nifi/lib/nifi-nar-utils-1.1.0.2.1.2.0-10.jar:/usr/hdf/current/nifi/lib/slf4j-api-1.7.12.jar:/usr/hdf/current/nifi/lib/nifi-properties-1.1.0.2.1.2.0-10.jar -Dorg.apache.jasper.compiler.disablejsr199=true -Xmx512m -Xms512m -Dambari.application.id=nifi -Dambari.metrics.collector.url=http://pvillard-hdf-1:6188/ws/v1/timeline/metrics -Dsun.net.http.allowRestrictedHeaders=true -Djava.net.preferIPv4Stack=true -Djava.awt.headless=true -XX:+UseG1GC -Djava.protocol.handler.pkgs=sun.net.www.protocol -Dnifi.properties.file.path=/usr/hdf/current/nifi/conf/nifi.properties -Dnifi.bootstrap.listen.port=39585 -Dapp=NiFi -Dorg.apache.nifi.bootstrap.config.log.dir=/var/log/nifi org.apache.nifi.NiFi -k A2EA52795B33AB2F21C93E7E820D08369F1448478C877F4C710D6E85FD904AE6

As you can see, the script is running a bootstrap (running a JVM between 12 and 24MB) that is in charge of the NiFi JVM itself. In this example, the script is running with the PID 19380 and is the parent of the bootstrap process running with PID 19382. The bootstrap itself is the parent of NiFi running with the PID 19419.

If you only kill the NiFi process, the bootstrap will detect it and automatically relaunch NiFi for you:

kill -9 19419

Then I have:

nifi     19380     1  0 12:33 ?        00:00:00 /bin/sh /usr/hdf/current/nifi/bin/nifi.sh start
nifi     19382 19380  0 12:33 ?        00:00:06 /usr/java/jdk1.8.0_131//bin/java -cp /usr/hdf/current/nifi/conf:/usr/hdf/current/nifi/lib/bootstrap/* -Xms12m -Xmx24m -Dorg.apache.nifi.bootstrap.config.log.dir=/var/log/nifi -Dorg.apache.nifi.bootstrap.config.pid.dir=/var/run/nifi -Dorg.apache.nifi.bootstrap.config.file=/usr/hdf/current/nifi/conf/bootstrap.conf org.apache.nifi.bootstrap.RunNiFi start
nifi     24702 19382 99 14:20 ?        00:00:05 /usr/java/jdk1.8.0_131/bin/java -classpath /usr/hdf/current/nifi/conf:/usr/hdf/current/nifi/lib/jcl-over-slf4j-1.7.12.jar:/usr/hdf/current/nifi/lib/jul-to-slf4j-1.7.12.jar:/usr/hdf/current/nifi/lib/log4j-over-slf4j-1.7.12.jar:/usr/hdf/current/nifi/lib/logback-classic-1.1.3.jar:/usr/hdf/current/nifi/lib/logback-core-1.1.3.jar:/usr/hdf/current/nifi/lib/nifi-runtime-1.1.0.2.1.2.0-10.jar:/usr/hdf/current/nifi/lib/nifi-api-1.1.0.2.1.2.0-10.jar:/usr/hdf/current/nifi/lib/nifi-documentation-1.1.0.2.1.2.0-10.jar:/usr/hdf/current/nifi/lib/nifi-framework-api-1.1.0.2.1.2.0-10.jar:/usr/hdf/current/nifi/lib/nifi-nar-utils-1.1.0.2.1.2.0-10.jar:/usr/hdf/current/nifi/lib/slf4j-api-1.7.12.jar:/usr/hdf/current/nifi/lib/nifi-properties-1.1.0.2.1.2.0-10.jar -Dorg.apache.jasper.compiler.disablejsr199=true -Xmx512m -Xms512m -Dambari.application.id=nifi -Dambari.metrics.collector.url=http://pvillard-hdf-1:6188/ws/v1/timeline/metrics -Dsun.net.http.allowRestrictedHeaders=true -Djava.net.preferIPv4Stack=true -Djava.awt.headless=true -XX:+UseG1GC -Djava.protocol.handler.pkgs=sun.net.www.protocol -Dnifi.properties.file.path=/usr/hdf/current/nifi/conf/nifi.properties -Dnifi.bootstrap.listen.port=39585 -Dapp=NiFi -Dorg.apache.nifi.bootstrap.config.log.dir=/var/log/nifi org.apache.nifi.NiFi -k A2EA52795B33AB2F21C93E7E820D08369F1448478C877F4C710D6E85FD904AE6

As you can see the NiFi process has a new PID since it is a new process launched by the bootstrap.

You can find more information about the bootstrap system in the administration guide. One interesting feature with this bootstrap approach is the bootstrap notifier. It allows you to configure a notification service that will be triggered when the bootstrap starts, stops or detects an interruption of the NiFi process.

With the new version of Apache NiFi (1.2.0), you can send notification to an HTTP(S) endpoint. If you need a custom notification service (example: send SNMP traps), it’s not that hard: you just need to extend AbstractNotificationService. You can have a look at the two existing implementations: email notifier and HTTP notifier.

Let’s configure our NiFi to use the email notification service and see what is the result. On each node of our cluster, we need to update the bootstrap.conf configuration file as below:

###
# Notification Services for notifying interested parties when NiFi is stopped, started, dies
###

# XML File that contains the definitions of the notification services
notification.services.file=./conf/bootstrap-notification-services.xml

# In the case that we are unable to send a notification for an event, how many times should we retry?
notification.max.attempts=5

# Comma-separated list of identifiers that are present in the notification.services.file; which services should be used to notify when NiFi is started?
nifi.start.notification.services=email-notification

# Comma-separated list of identifiers that are present in the notification.services.file; which services should be used to notify when NiFi is stopped?
nifi.stop.notification.services=email-notification

# Comma-separated list of identifiers that are present in the notification.services.file; which services should be used to notify when NiFi dies?
nifi.dead.notification.services=email-notification

In this case, we are saying that the configuration of our notification services (we can define and use multiple notifiers) is in the file

./conf/bootstrap-notification-services.xml

and that we want to use the notifier called “email-notification” (that’s the default name defined in the XML configuration file) for stop, start, and dead events. As stated in the documentation, it’s possible to define a list of notifiers for each type of event.

For this demonstration, I’ll use my Gmail account and the Gmail SMTP server to send the notifications (obviously, with this example, NiFi needs a public internet access to send the requests to the SMTP server). Here is the configuration file:

<services>
     <service>
        <id>email-notification</id>
        <class>org.apache.nifi.bootstrap.notification.email.EmailNotificationService</class>
        <property name="SMTP Hostname">smtp.gmail.com</property>
        <property name="SMTP Port">587</property>
        <property name="SMTP Username">my-email-address@gmail.com</property>
        <property name="SMTP Password">myPassword</property>
        <property name="SMTP TLS">true</property>
        <property name="From">"NiFi Service Notifier"</property>
        <property name="To">email address that will receive the notifications</property>
     </service>
</services>

Here are the emails I received when restarting NiFi or killing the NiFi process:

  • Start event

Title: NiFi Started on Host pvillard-hdf-2 (172.26.249.33)

Hello,

Apache NiFi has been started on host pvillard-hdf-2 (172.26.249.33) at 2017/04/27 22:15:01.376 by user nifi

  • Stop event

Title: NiFi Stopped on Host pvillard-hdf-2 (172.26.249.33)

Hello,

Apache NiFi has been told to initiate a shutdown on host pvillard-hdf-2 (172.26.249.33) at 2017/04/27 22:14:35.702 by user nifi

  • Dead event

Title: NiFi Died on Host pvillard-hdf-2 (172.26.249.33)

Hello,

It appears that Apache NiFi has died on host pvillard-hdf-2 (172.26.249.33) at 2017/04/28 09:52:01.973; automatically restarting NiFi

OK, now let’s see how to configure the HTTP notification service. Let’s say I have a web server listening here:

http://pvillard-hdf-4:9999/notification

My XML configuration is now:

<services>
     <service>
        <id>email-notification</id>
        <class>org.apache.nifi.bootstrap.notification.email.EmailNotificationService</class>
        <property name="SMTP Hostname">smtp.gmail.com</property>
        <property name="SMTP Port">587</property>
        <property name="SMTP Username">my-email-address@gmail.com</property>
        <property name="SMTP Password">myPassword</property>
        <property name="SMTP TLS">true</property>
        <property name="From">"NiFi Service Notifier"</property>
        <property name="To">email address that will receive the notifications</property>
     </service>
     <service>
        <id>http-notification</id>
        <class>org.apache.nifi.bootstrap.notification.http.HttpNotificationService</class>
        <property name="URL">http://pvillard-hdf-4:9999/notification</property>
     </service>
</services>

And my bootstrap.conf configuration file now contains:

###
# Notification Services for notifying interested parties when NiFi is stopped, started, dies
###

# XML File that contains the definitions of the notification services
notification.services.file=./conf/bootstrap-notification-services.xml

# In the case that we are unable to send a notification for an event, how many times should we retry?
notification.max.attempts=5

# Comma-separated list of identifiers that are present in the notification.services.file; which services should be used to notify when NiFi is started?
nifi.start.notification.services=email-notification,http-notification

# Comma-separated list of identifiers that are present in the notification.services.file; which services should be used to notify when NiFi is stopped?
nifi.stop.notification.services=email-notification,http-notification

# Comma-separated list of identifiers that are present in the notification.services.file; which services should be used to notify when NiFi dies?
nifi.dead.notification.services=email-notification,http-notification

I now restart NiFi, and I can confirm that I receive a notification (I used a standalone NiFi to confirm the reception of the notification):

Screen Shot 2017-05-02 at 5.15.29 PM

The content of the notification is the same as with the email notification service. Note that it’s also possible to configure properties for a keystore and a truststore to send notifications using HTTPS:

<service>
   <id>http-notification</id>
   <class>org.apache.nifi.bootstrap.notification.http.HttpNotificationService</class>

   /* The URL to send the notification to */
   <property name="URL"></property>

   /* Max wait time for connection to remote service - default is 10s */
   <property name="Connection timeout"></property>

   /* Max wait time for remote service to read the request sent - default is 10s */
   <property name="Write timeout"></property>

   /* The fully-qualified filename of the Truststore */
   <property name="Truststore Filename"></property>

   /* The Type of the Truststore. Either JKS or PKCS12 */
   <property name="Truststore Type"></property>

   /* The password for the Truststore */
   <property name="Truststore Password"></property>

   /* The fully-qualified filename of the Keystore */
   <property name="Keystore Filename"></property>

   /* The Type of the Keystore. Either JKS or PKCS12 */
   <property name="Keystore Type"></property>

   /* The password for the Keystore */
   <property name="Keystore Password"></property>

   /* The password for the key. If this is not specified, but the Keystore Filename, Password, and Type are specified, then the Keystore Password will be assumed to be the same as the Key Password */
   <property name="Key Password"></property>

   /* The algorithm to use for this SSL context. Either TLS or SSL */
   <property name="SSL Protocol"></property>
</service>

Note that I also received an email notification since I specified the two notifiers in the configuration.

Again, if you need a custom notifier, this should not be really difficult and you are more than welcome to contribute your notifier code to the Apache community. It will give more options to users when setting a bootstrap notifier.

As usual feel free to ask questions and comment this post.

Monitoring NiFi – Introduction

Apache NiFi 1.2.0 has just been released with a lot of very cool new features… and I take this opportunity to start a series of articles around monitoring. This is a recurring subject and I often hear the same questions. This series won’t provide an exhaustive list of the ways you can use to monitor NiFi (with or without HDF) but, at least, it should get you started!

Here is a quick summary of the subjects that will be covered:

For this series of article, I will use, as a demo environment, a 4-nodes HDF cluster (running on CentOS 7):

I’m using HDF to take advantage of Ambari to ease the deployment but this is not mandatory for what I’m going to discuss in the articles (except for stuff around the Ambari reporting task obviously).

I will not cover how setting up this environment but if this is something you are looking after, feel free to ask questions (here or on the Hortonworks Community Connection) and to have a look into Hortonworks documentation about HDF.

I don’t want to write a single (very) long article and for the sake of clarity there is one article per listed subject. Also, I’ll try to update the articles to stick as best as possible to latest features provided by NiFi over time.

Also, if you feel that some subjects should be added to the list, let me know and I’ll do my best to cover other monitoring-related questions.

List/Fetch pattern and Remote Process Group in Apache NiFi

I do see a lot of questions about how is working the List[X]/Fetch[X] processors and how to load balance the data over the nodes of a NiFi cluster once the data is already in the cluster. Since the question comes up quite often, let’s discuss the subject and let’s try to understand how things are working here.

I will assume that you are running a NiFi cluster since there is no problem about data balancing with a standalone instance 😉

The first thing to understand is: when running a cluster, one of the node is randomly designated as the “Primary node”. The election takes place when the cluster starts, and there is no way to decide which node will be the primary node. OK… you could force things when your cluster starts but there is no point to do such a thing if you want real high availability. So short line is: all nodes may have to be the Primary node at one point and don’t assume that the Primary node will be a given node in particular.

Second thing to understand is: the flow that you are designing on your canvas is running on all the nodes independently. Each node of the cluster is responsible of its own data and a relationship between two processors does not mean that the data going into this relationship will be balanced over the nodes. Unless you use a Remote Process Group (see below) the data will remain on the same node from the beginning to the end of the flow.

I will use a the following example to illustrate my explanations: I want to get files from a remote SFTP server and put the files into HDFS.

  • First idea (bad idea!) / GetSFTP -> PutHDFS

Screen Shot 2017-02-23 at 11.04.42 AM.png

The first option could be the pattern Get/Put which is perfectly fine with a standalone instance. However this will cause issues if you have a NiFi cluster. Remember? The flow is running on all hosts of your cluster. Problem is that you will have concurrent accesses from your nodes to the same files on the SFTP server… and if the processor is configured to delete the file once retrieved (default behavior) you will have errors showing up. Conclusion: always have in mind that a processor is running on all the nodes and can cause concurrent access errors depending on the remote system.

  • Second idea (not efficient!) / GetSFTP on Primary Node -> PutHDFS

The second option is to configure the GetSFTP processor to only run on the Primary Node (in the Scheduling tab of the processor configuration):

Screen Shot 2017-02-23 at 11.10.12 AM.png

This way, you will solve the concurrent accesses since only one node of your cluster (the Primary node) will run the GetSFTP processor.

Brief aside: remember, the flow is running on all the nodes, however if the processor is configured to run on the primary node only, the processor won’t be scheduled on nodes not being the primary node. That’s all.

With this approach the problem is that it’s not efficient at all. First reason is that you get data from only one node (this does not scale at all), and, in the end, only the primary node of your cluster is actually handling the data. Why? Because, unless you explicitly use a remote process group, the data will remain on the same node from the beginning to the end. In this case, only the primary node will actually get data from SFTP server and push it into HDFS.

  • Recommended pattern : ListSFTP -> RPG / Input Port -> FetchSFTP -> PutHDFS

To solve the issues, the List/Fetch pattern has been developed and widely used for a lot of processors. The idea is the following: the List processor will only list the data to retrieve on the remote system and get the associated metadata (it will not get the data itself). For each listed item, a flow file (with no content) will be generated and attributes will be populated with the metadata. Then the flow file is sent to the Fetch processor to actually retrieved the data from the remote system based on the metadata (it can be the path of the file on the remote system for example). Since each flow file contains the metadata of a specific item on the remote system, you won’t have concurrent accesses even if you have multiple Fetch processors running in parallel.

Obviously the List processor is meant to be run on the Primary node only. Then you have to balance the generated flow files over the nodes so that the Fetch processor on each node is dealing with flow files. For this purpose you have to use a Remote Process Group.

A Remote Process Group is an abstract object used to connect two NiFi setup together (the communication between the two NiFi is what we call Site-to-Site or S2S). It can be a MiNiFi instance to a NiFi cluster, a NiFi cluster to another NiFi cluster, a NiFi standalone to a NiFi cluster, etc. And it can also be used to connect a NiFi cluster to itself! This way the flow files will be balanced over all the nodes of the cluster. Few things to know with a Remote Process Group:

  1. You need to have an input port on the remote instance you are connecting to (in our case, you need a remote input port on your canvas).
  2. The address you give when configuring your remote process group does not matter in terms of high availability: once the connection is established with one of the nodes of the remote instance, the remote process group will be aware of all the nodes of the remote instance and will manage the case where the node specified in the address goes down.
  3. Your instances need to be configured to allow remote access. The required properties are:

# Site to Site properties
nifi.remote.input.host=
nifi.remote.input.secure=
nifi.remote.input.socket.port=
nifi.remote.input.http.enabled=
nifi.remote.input.http.transaction.ttl=

In the case of our SFTP example, it looks like:

Screen Shot 2017-02-23 at 11.47.40 AM.png

Let’s try to understand what is going on from a cluster perspective. Here is what we have in the case of a 3-nodes NiFi cluster with ListSFTP running on the primary node only:

Screen Shot 2017-02-23 at 12.03.22 PM.png

The ListSFTP when scheduled is going to list the three files on my remote SFTP server and will generate one flow file for each remote file. Each flow file won’t have any content but will have attributes with metadata of the remote files. In the case of ListSFTP, I’ll have (check the documentation at the “Write attributes” paragraph):

Name Description
sftp.remote.host The hostname of the SFTP Server
sftp.remote.port The port that was connected to on the SFTP Server
sftp.listing.user The username of the user that performed the SFTP Listing
file.owner The numeric owner id of the source file
file.group The numeric group id of the source file
file.permissions The read/write/execute permissions of the source file
file.lastModifiedTime The timestamp of when the file in the filesystem waslast modified as ‘yyyy-MM-dd’T’HH:mm:ssZ’
filename The name of the file on the SFTP Server
path The fully qualified name of the directory on the SFTP Server from which the file was pulled

The ListSFTP processor will generate 3 flow files and, for now, all flow files are only on the primary node:

Screen Shot 2017-02-23 at 1.59.43 PM.png

Now the Remote Process Group has been configured to connect to the cluster itself, and I set the relationship going from ListSFTP to the Remote Process Group to connect with the input port I created (you may have multiple input ports in the remote system to connect with and you can choose the input port to connect to, that’s up to your needs). When the RPG (Remote Process Group) has the communication enabled, the RPG is aware of the three nodes and will balance the data to each remote node (be aware that there is a lot of parameters for Site-to-Site to improve efficiency). In my case that would give something like:

Screen Shot 2017-02-23 at 1.59.55 PM.png

Note: that would be an ideal case in terms of balancing but, for efficiency purpose, the Site-to-Site mechanism might send batch of flow files to the remote node. In the above example, with only 3 flow files, I would probably not end up with one flow file per node.

Now, since we have everything in the attributes of our flow files, we need to use the Expression Language to set the properties of the FetchSFTP processor to use the attributes of the incoming flow files:

screen-shot-2017-02-23-at-1-55-13-pm

This way, each instance of the FetchSFTP processor will take care of its own file (to actually fetch the content of the remote data) and there won’t be any concurrent access:

Screen Shot 2017-02-23 at 2.00.12 PM.png

All your nodes are retrieving data and you really can scale up your cluster depending on your requirements. Note also that the PutHDFS won’t be an issue neither since each node will write its own file.

As I said previously a lot of processors are embracing this pattern (and this is recommended way to use such processors with a NiFi cluster), and I’d strongly encourage you to do the same when developing your custom processors.

As always questions/comments are welcome.