Authorizations with LDAP synchronization in Apache NiFi 1.4+

With the release of Apache NiFi 1.4.0, quite a lot of new features are available. One of it is the improved management of the users and groups. Until this release, it was possible to configure a LDAP (or Active Directory) server but it was only used during the authentication process. Once authenticated it was necessary to have explicit policies for this user to access NiFi resources. And to create a policy for a given user, it was first necessary to manually create this user in NiFi users/groups management view. This time is now over. Users/groups management is now greatly simplified in terms of lifecycle management.

In addition to that, if you are using Apache Ranger as the external authorizer system for NiFi, you can now define rules based on LDAP groups. Before, you had to configure, in Ranger, rules explicitly based on users.

In this article, we are going to discuss how this is actually working and how you can configure it.

If you’re interested by the technical details of the implementation, you can look at the corresponding JIRAs (NIFI-4032, NIFI-4059, NIFI-4127) and Github pull requests (#1923, #1978, #2019).

Basically, the authorizer mechanism evolved quite a bit. Before NiFi 1.4, the authorizers.xml was containing a list of configurations for any authorizer implementation you wanted to use to manage policies in NiFi. Unless you developed your own implementations, you had the choice between the FileAuthorizer (default implementation that stores the policies in a local file) and the RangerNiFiAuthorizer to user Apache Ranger as the external mechanism managing the policies.

If using the FileAuthorizer, the configuration was looking like (in a single node installation):

    <authorizer>
        <identifier>file-provider</identifier>
        <class>org.apache.nifi.authorization.FileAuthorizer</class>
        <property name="Authorizations File">./conf/authorizations.xml</property>
        <property name="Users File">./conf/users.xml</property>
        <property name="Initial Admin Identity”>admin</property>
        <property name="Legacy Authorized Users File"></property>
    </authorizer>

And we set the corresponding property in the nifi.properties file:

nifi.security.user.authorizer=file-provider

Starting with NiFi 1.4, the authorizers.xml file provides much more functionalities (note that the changes are backward compatible and do not require any change from your side if you don’t want to change it).

Let’s start by the new implementation of the authorizer: the Standard Managed Authorizer.

Note – there is also a new Managed Ranger Authorizer, but I won’t go into the details of this implementation in this blog. This implementation gives you the possibility to use Apache Ranger as the external system managing the authorizations but you still have access to the policies in the NiFi UI, and you can also manage additional users. It’s also this implementation that allows you to define group-based policies in Ranger.

It’s configured as below:

    <authorizer>
        <identifier>managed-authorizer</identifier>
        <class>org.apache.nifi.authorization.StandardManagedAuthorizer</class>
        <property name="Access Policy Provider">file-access-policy-provider</property>
    </authorizer>

This new implementation expects the identifier of the Access Policy Provider implementation you want to use. This new abstraction will be used to access and manage users, groups and policies… and to enforce policies when dealing with requesting access to NiFi resources. In the above example, our authorizer is identified with name “managed-authorizer”, and that’s what you need to set in nifi.properties to user it:

nifi.security.user.authorizer=managed-authorizer

You can see that this authorizer expects a property Access Policy Provider with the identifier of the provider you want to use… Let’s move on to the Access Policy Provider. For now, there is a single implementation which is the FileAccessPolicyProvider. If you already know about the previous FileAuthorizer, you shouldn’t be very surprised by the expected properties. Here is a configuration example:

    <accessPolicyProvider>
        <identifier>file-access-policy-provider</identifier>
        <class>org.apache.nifi.authorization.FileAccessPolicyProvider</class>
        <property name="User Group Provider">file-user-group-provider</property>
        <property name="Authorizations File">./conf/authorizations.xml</property>
        <property name="Initial Admin Identity"></property>
        <property name="Legacy Authorized Users File"></property>

        <property name="Node Identity 1"></property>
    </accessPolicyProvider>

Note: as you can see the identifier of this Access Policy Provider is “file-access-policy-provider”, and that’s what we referenced in the property of the authorizer (see above).

As with the FileAuthorizer, you have the Initial Admin Identity property which lets you configure the identity of the user with the admin permissions to set the first policies after a fresh install of NiFi. As the documentation says:

Initial Admin Identity – The identity of an initial admin user that will be granted access to the UI and given the ability to create additional users, groups, and policies. The value of this property could be a DN when using certificates or LDAP, or a Kerberos principal. This property will only be used when there are no other policies defined. If this property is specified then a Legacy Authorized Users File cannot be specified.
NOTE: Any identity mapping rules specified in nifi.properties will also be applied to the initial admin identity, so the value should be the unmapped identity. This identity must be found in the configured User Group Provider.

Then you still have the Legacy Authorized Users File property in case you are upgrading from a NiFi 0.x install and you want to keep your previous policies in place.

You have the Authorizations File property that defines the path to the file that will locally store all the policies. You also find the Node Identity properties in case you are in a NiFi cluster. Nothing changed on this side, but just in case, a quick reminder from the official documentation:

Node Identity [unique key] – The identity of a NiFi cluster node. When clustered, a property for each node should be defined, so that every node knows about every other node. If not clustered these properties can be ignored. The name of each property must be unique, for example for a three nodes cluster: “Node Identity A”, “Node Identity B”, “Node Identity C” or “Node Identity 1”, “Node Identity 2”, “Node Identity 3”.
NOTE: Any identity mapping rules specified in nifi.properties will also be applied to the node identities, so the values should be the unmapped identities (i.e. full DN from a certificate). This identity must be found in the configured User Group Provider.

OK… now we have a new property called “User Group Provider” and that’s where we’re going to specify the identifier of the User Group Provider to be used. This User Group Provider is a new abstraction allowing you to define how users and groups should be automatically retrieved to then define policies on them.

You have multiple implementations available:
  • CompositeUserGroupProvider
  • CompositeConfigurableUserGroupProvider
  • LdapUserGroupProvider
  • FileUserGroupProvider

As the name suggests, the CompositeUserGroupProvider implementation allows you to use at the same time multiple implementations of the User Group Provider. This is very useful, mainly because when using NiFi in clustering mode, you need to define some policies for the nodes belonging to the cluster. And, as you may know, in NiFi, nodes are considered as users. In case your nodes are not defined in your LDAP or Active Directory, you will certainly want to use the composite implementation.

Now you need to consider the CompositeConfigurableUserGroupProvider implementation which is the one you will certainly want to use in most cases. This implementation will also provide support for retrieving users and groups from multiple sources. But the huge difference is that this implementation expects a single configurable user group provider. It means that users and groups from the configurable user group provider are configurable from the UI (as you did when creating users/groups from NiFi UI in previous versions). However, users/groups loaded from one of the other User Group Providers will not be.

Note that it’s up to each User Group provider implementation to define if it is configurable or not. For instance, the LDAP User Group Provider is not configurable: NiFi is not going to manage users and groups in the LDAP/AD server.

A typical configuration will be the definition of the Composite Configurable User Group provider with the File User Group provider as the configurable instance and one instance of the LDAP User Group provider:

    <userGroupProvider>
       <identifier>composite-configurable-user-group-provider</identifier>
       <class>org.apache.nifi.authorization.CompositeConfigurableUserGroupProvider</class>
       <property name="Configurable User Group Provider">file-user-group-provider</property>
       <property name="User Group Provider 1">ldap-user-group-provider</property>
    </userGroupProvider>

In this case, in the definition of the access policy provider, we need to change the property to use the correct user group provider:

    <accessPolicyProvider>
        <identifier>file-access-policy-provider</identifier>
        <class>org.apache.nifi.authorization.FileAccessPolicyProvider</class>
        <property name="User Group Provider">composite-configurable-user-group-provider</property>
        <property name="Authorizations File">./conf/authorizations.xml</property>
        <property name="Initial Admin Identity"></property>
        <property name="Legacy Authorized Users File"></property>

        <property name="Node Identity 1"></property>
    </accessPolicyProvider>

Now, let’s look at the File User Group provider. The objective of this provider is to provide the same functionalities as before: the user can manage users and groups from the UI and everything is stored locally in a file. Configuration looks like:

    <userGroupProvider>
       <identifier>file-user-group-provider</identifier>
       <class>org.apache.nifi.authorization.FileUserGroupProvider</class>
       <property name="Users File">./conf/users.xml</property>
       <property name="Legacy Authorized Users File"></property>

       <property name="Initial User Identity 1"></property>
    </userGroupProvider>

The initial user identities are users that should automatically populated when creating the users.xml file for the first time. Typically you would define here your initial admin identity (if this user is not defined via the LDAP user group provider). From the documentation:

Initial User Identity [unique key] – The identity of a users and systems to seed the Users File. The name of each property must be unique, for example: “Initial User Identity A”, “Initial User Identity B”, “Initial User Identity C” or “Initial User Identity 1”, “Initial User Identity 2”, “Initial User Identity 3”.

NOTE: Any identity mapping rules specified in nifi.properties will also be applied to the user identities, so the values should be the unmapped identities (i.e. full DN from a certificate).

OK… now let’s move to the last user group provider: the one allowing an automatic synchronisation of your users and groups with a LDAP/AD server. Here is the configuration part:

    <userGroupProvider>
       <identifier>ldap-user-group-provider</identifier>
       <class>org.apache.nifi.ldap.tenants.LdapUserGroupProvider</class>
       <property name="Authentication Strategy">START_TLS</property>

       <property name="Manager DN"></property>
       <property name="Manager Password"></property>

       <property name="TLS - Keystore"></property>
       <property name="TLS - Keystore Password"></property>
       <property name="TLS - Keystore Type"></property>
       <property name="TLS - Truststore"></property>
       <property name="TLS - Truststore Password"></property>
       <property name="TLS - Truststore Type"></property>
       <property name="TLS - Client Auth"></property>
       <property name="TLS - Protocol"></property>
       <property name="TLS - Shutdown Gracefully"></property>

       <property name="Referral Strategy">FOLLOW</property>
       <property name="Connect Timeout">10 secs</property>
       <property name="Read Timeout">10 secs</property>

       <property name="Url"></property>
       <property name="Page Size"></property>
       <property name="Sync Interval">30 mins</property>

       <property name="User Search Base"></property>
       <property name="User Object Class">person</property>
       <property name="User Search Scope">ONE_LEVEL</property>
       <property name="User Search Filter"></property>
       <property name="User Identity Attribute"></property>
       <property name="User Group Name Attribute"></property>
       <property name="User Group Name Attribute - Referenced Group Attribute"></property>

       <property name="Group Search Base"></property>
       <property name="Group Object Class">group</property>
       <property name="Group Search Scope">ONE_LEVEL</property>
       <property name="Group Search Filter"></property>
       <property name="Group Name Attribute"></property>
       <property name="Group Member Attribute"></property>
       <property name="Group Member Attribute - Referenced User Attribute"></property>
    </userGroupProvider>

You can find the usual parameters that you configured for the LDAP authentication part, but there is also a lot of new parameters to only synchronized specific parts of your remote LDAP/AD servers. The documentation says:

‘Url’ – Space-separated list of URLs of the LDAP servers (i.e. ldap://<hostname>:<port>).

‘Page Size’ – Sets the page size when retrieving users and groups. If not specified, no paging is performed.

‘Sync Interval’ – Duration of time between syncing users and groups (i.e. 30 mins). Minimum allowable value is 10 secs.

‘User Search Base’ – Base DN for searching for users (i.e. ou=users,o=nifi). Required to search users.

‘User Object Class’ – Object class for identifying users (i.e. person). Required if searching users.

‘User Search Scope’ – Search scope for searching users (ONE_LEVEL, OBJECT, or SUBTREE). Required if searching users.

‘User Search Filter’ – Filter for searching for users against the ‘User Search Base’ (i.e. (memberof=cn=team1,ou=groups,o=nifi) ). Optional.

‘User Identity Attribute’ – Attribute to use to extract user identity (i.e. cn). Optional. If not set, the entire DN is used.

‘User Group Name Attribute’ – Attribute to use to define group membership (i.e. memberof). Optional. If not set group membership will not be calculated through the users. Will rely on group membership being defined through ‘Group Member Attribute’ if set. The value of this property is the name of the attribute in the user ldap entry that associates them with a group. The value of that user attribute could be a dn or group name for instance. What value is expected is configured in the ‘User Group Name Attribute – Referenced Group Attribute’.

‘User Group Name Attribute – Referenced Group Attribute’ – If blank, the value of the attribute defined in ‘User Group Name Attribute’ is expected to be the full dn of the group. If not blank, this property will define the attribute of the group ldap entry that the value of the attribute defined in ‘User Group Name Attribute’ is referencing (i.e. name). Use of this property requires that ‘Group Search Base’ is also configured.

‘Group Search Base’ – Base DN for searching for groups (i.e. ou=groups,o=nifi). Required to search groups.

‘Group Object Class’ – Object class for identifying groups (i.e. groupOfNames). Required if searching groups.

‘Group Search Scope’ – Search scope for searching groups (ONE_LEVEL, OBJECT, or SUBTREE). Required if searching groups.

‘Group Search Filter’ – Filter for searching for groups against the ‘Group Search Base’. Optional.

‘Group Name Attribute’ – Attribute to use to extract group name (i.e. cn). Optional. If not set, the entire DN is used.

‘Group Member Attribute’ – Attribute to use to define group membership (i.e. member). Optional. If not set group membership will not be calculated through the groups. Will rely on group membership being defined through ‘User Group Name Attribute’ if set. The value of this property is the name of the attribute in the group ldap entry that associates them with a user. The value of that group attribute could be a dn or memberUid for instance. What value is expected is configured in the ‘Group Member Attribute – Referenced User Attribute’. (i.e. member: cn=User 1,ou=users,o=nifi vs. memberUid: user1)

‘Group Member Attribute – Referenced User Attribute’ – If blank, the value of the attribute defined in ‘Group Member Attribute’ is expected to be the full dn of the user. If not blank, this property will define the attribute of the user ldap entry that the value of the attribute defined in ‘Group Member Attribute’ is referencing (i.e. uid). Use of this property requires that ‘User Search Base’ is also configured. (i.e. member: cn=User 1,ou=users,o=nifi vs. memberUid: user1)

NOTE: Any identity mapping rules specified in nifi.properties will also be applied to the user identities. Group names are not mapped.

Please find more information in the documentation here.

If I have to summarize a bit the new authorizers.xml file structure, I could use this image:

 

Screen Shot 2017-12-22 at 6.25.03 PM

Now we discussed the technical details. Let’s demo it. I’ll re-use Apache Directory Studio to setup a local LDAP server as I did in my article about LDAP authentication with NiFi. I’ll skip the details (please refer to the article if needed) and create the following structure:

Screen Shot 2017-12-22 at 4.20.38 PM.png

In a group, I have:

Screen Shot 2017-12-22 at 4.21.43 PM

And for a user, I have:

Screen Shot 2017-12-22 at 4.22.25 PM

Note that I’m using a very bad hack because, by default, the attribute ‘memberOf’ is not available unless you define additional objectClass. As a workaround, I’m using the ‘title’ attribute to represent the membership of a user to different groups. It’s quick and dirty, but it’ll do for this demo.

Now, here is my authorizers.xml file:

<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<authorizers>
  <userGroupProvider>
    <identifier>file-user-group-provider</identifier>
    <class>org.apache.nifi.authorization.FileUserGroupProvider</class>
    <property name="Users File">./conf/users.xml</property>
    <property name="Legacy Authorized Users File"></property>
    <property name="Initial User Identity 1"></property>
  </userGroupProvider>

  <userGroupProvider>
    <identifier>ldap-user-group-provider</identifier>
    <class>org.apache.nifi.ldap.tenants.LdapUserGroupProvider</class>
    <property name="Authentication Strategy">SIMPLE</property>

    <property name="Manager DN">uid=admin,ou=system</property>
    <property name="Manager Password">secret</property>

    <property name="Referral Strategy">FOLLOW</property>
    <property name="Connect Timeout">10 secs</property>
    <property name="Read Timeout">10 secs</property>

    <property name="Url">ldap://localhost:10389</property>
    <property name="Page Size"></property>
    <property name="Sync Interval">30 mins</property>

    <property name="User Search Base">ou=people,dc=nifi,dc=com</property>
    <property name="User Object Class">person</property>
    <property name="User Search Scope">ONE_LEVEL</property>
    <property name="User Search Filter">(title=cn=nifi,ou=groups,dc=nifi,dc=com)</property>
    <property name="User Identity Attribute">cn</property>
    <property name="User Group Name Attribute">title</property>
    <property name="User Group Name Attribute - Referenced Group Attribute"></property>

    <property name="Group Search Base"></property>
    <property name="Group Object Class">group</property>
    <property name="Group Search Scope">ONE_LEVEL</property>
    <property name="Group Search Filter"></property>
    <property name="Group Name Attribute">cn</property>
    <property name="Group Member Attribute"></property>
    <property name="Group Member Attribute - Referenced User Attribute"></property>
  </userGroupProvider>

  <userGroupProvider>
    <identifier>composite-configurable-user-group-provider</identifier>
    <class>org.apache.nifi.authorization.CompositeConfigurableUserGroupProvider</class>
    <property name="Configurable User Group Provider">file-user-group-provider</property>
    <property name="User Group Provider 1">ldap-user-group-provider</property>
  </userGroupProvider>

  <accessPolicyProvider>
    <identifier>file-access-policy-provider</identifier>
    <class>org.apache.nifi.authorization.FileAccessPolicyProvider</class>
    <property name="User Group Provider">composite-configurable-user-group-provider</property>
    <property name="Authorizations File">./conf/authorizations.xml</property>
    <property name="Initial Admin Identity">admin</property>
    <property name="Legacy Authorized Users File"></property>
    <property name="Node Identity 1"></property>
 </accessPolicyProvider>

  <authorizer>
    <identifier>managed-authorizer</identifier>
    <class>org.apache.nifi.authorization.StandardManagedAuthorizer</class>
    <property name="Access Policy Provider">file-access-policy-provider</property>
  </authorizer>
</authorizers>

In this case, I decide to go through the users defined in my ‘people’ OU, to filter only the users belonging to the ‘nifi’ group and to use the ‘cn’ attribute as the username. I also specify that the ‘title’ attribute is the group membership of a user. This way, NiFi is able to do the mapping between the users and groups. Note that my ‘admin’ user that I defined as my initial admin identity is in my LDAP server, and I don’t need to define it in the File User Group provider definition.

When starting NiFi and connecting to it as the ‘admin’ user, I can go in the Users view and I can find:

Screen Shot 2017-12-22 at 4.37.29 PM

Note that the button to add users and groups is available since I used the Composite Configurable User Group provider and defined the File User Group provider. That’s how I would specify my nodes as users if I don’t want to have the servers in my LDAP/AD.

Also note that this will automatically be synchronized with LDAP/AD based on the “Sync Interval” you specified in the authorizers configuration file.

Finally, as mentioned in the docs, remember that the order is important when using composite providers in case you have users/groups collisions between multiple sources.

With this configuration, I don’t have to care anymore about defining users and groups in NiFi and I can directly create my policies. It’s much more efficient to manage everything in case people are leaving, or changing of projects. Cool, isn’t it?

Let me know if you have any comment/question.

XML data processing with Apache NiFi

I recently had to work on a NiFi workflow to process millions of XML documents per day. One of the step being the conversion of the XML data into JSON. It raises the question of the performances and I will briefly expose my observations in this post.

The two most natural approaches to convert XML data with Apache NiFi are:

  • Use the TransformXML processor with a XSLT file
  • Use a scripted processor or use a custom Java processor relying on a library

There are few XSLT available on the internet providing a generic way to transform any XML into a JSON document. That’s really convenient and easy to use. However, depending of your use case, you might need specific features.

In my case, I’m processing a lot of XML files based on the same input schema (XSD) and I want the output to be compliant to the same Avro schema (in order to use the record-oriented processors in NiFi). The main issue is to force the generation of an array when you only have one single element in your input.

XSLT approach

Example #1:

<MyDocument>
  <MyList>
    <MyElement>
      <Text>Some text...</Text>
      <RecordID>1</RecordID>
    </MyElement>
    <MyElement>
      <Text>Some text...</Text>
      <RecordID>1</RecordID>
    </MyElement>
  </MyList>
</MyDocument>

This XML document will be converted into the following JSON:

{
   "MyDocument" : {
     "MyList" : {
       "MyElement" : [ {
           "Text" : "Some text...",
           "RecordID" : 1
         }, {
           "Text" : "Some text...",
           "RecordID" : 2
         } ]
      }
   }
}

Example #2:

However, if you have the following XML document:

<MyDocument>
  <MyList>
    <MyElement>
      <Text>Some text...</Text>
      <RecordID>1</RecordID>
    </MyElement>
  </MyList>
</MyDocument>

The document will be converted into:

{
  "MyDocument" : {
    "MyList" : {
      "MyElement" : {
        "Text" : "Some text...",
        "RecordID" : 1
      }
    }
  }
}

Force array

And here start the problems… because we don’t have the same Avro schema. That is why I recommend using the XSLT file provided by Bram Stein here on Github. It provides a way to force the creation of an array. To do that, you need to insert a tag into your XML input file. The tag to insert is

json:force-array="true"

But for this tag to be correctly interpreted, you also need to specify the corresponding namespace:

xmlns:json="http://json.org/"

In the end, using ReplaceText processors with regular expressions, you need to have the following input (for the example #2):

<MyDocument xmlns:json="http://json.org/">
  <MyList>
    <MyElement json:force-array="true">
      <Text>Some text...</Text>
      <RecordID>1</RecordID>
    </MyElement>
  </MyList>
</MyDocument>

And this will give you:

{
  "MyDocument" : {
    "MyList" : {
      "MyElement" : [ {
        "Text" : "Some text...",
        "RecordID" : 1
      } ]
    }
  }
}

And now I do have the same schema describing my JSON documents. Conclusion: you need to use regular expressions to add a namespace in the first tag of your document and add the JSON array tag in every tag wrapping elements that should be part of an array.

Java approach

Now, let’s assume you’re not afraid about using scripted processors or developing your own custom processor. Then it’s really easy to have a processor doing the same using a Java library like org.json (note that library is *NOT* Apache friendly in terms of licensing and that’s why the following code cannot be released with Apache NiFi). Here is an example of custom processor doing the conversion. And here is a Groovy version for the ExecuteScript processor.

What about arrays with this solution? Guess what… It’s kind of similar: you have to use a ReplaceText processor before and after to ensure that arrays are arrays in the JSON output for any number of elements in your input. Also, you might have to do some other transformations like removing the namespaces or replacing empty strings

""

by

null

values (by default, everything will be converted to an empty string although you might want null record instead).

To force arrays, the easiest approach is to double every tag that should be converted into an array. With the example #2, I transform my input to have:

<MyDocument>
  <MyList>
    <MyElement /><MyElement>
      <Text>Some text...</Text>
      <RecordID>1</RecordID>
    </MyElement>
  </MyList>
</MyDocument>

It’ll give me the following JSON:

{
  "MyDocument" : {
    "MyList" : {
      "MyElement" : [ "", {
        "Text" : "Some text...",
        "RecordID" : 1
      } ]
    }
  }
}

And, then, I can use another ReplaceText processor to remove the unwanted empty strings created by the conversion.

Conclusion: with the two approaches you’ll need to be a bit intrusive in your data to get the expected results. What about the performances now?

Benchmark

I remove the ReplaceText processors from the equation as I usually need the same amount of regular expressions work in both cases. I want to only focus on:

I’ll compare the performances of each case using input of different sizes (data generated using a GenerateFlowFile processor) with default configuration (one thread, no change on run duration, etc) on my laptop.

Method: I’m generating as much data as possible (it’s always the same file during a single run) using the GenerateFlowFile processor. I wait at least 5 minutes to have a constant rate of processing and I get the mean on a 5 minutes window of constant processing.

Screen Shot 2017-09-07 at 12.12.12 AM.png

For each run, I’m only running the GenerateFlowFile, one of the three processors I’m benchmarking, and the UpdateAttribute (used to only drop the data).

The input data used for the benchmark is a fairly complex XML document with arrays of arrays, lot of elements in the arrays, deeply nested records, etc. To reduce the size of the input size, I’m not changing the structure but only removing elements in the arrays. In other words: the schema describing the output data remains the same for each run.

Note that the custom Java/Groovy option is loading the full XML document in memory. To process very large XML document, a streaming approach with another library would certainly be better suited.

Here are the results with input data of 5KB, 10KB, 100KB, 500KB and 1000KB. The below graph gives the number of XML files processed per second based on the input size for each solution.

Screen Shot 2017-09-07 at 10.16.45 PM

It’s clear that the custom Java processor is the most efficient one. The XSLT option is really nice when you want to do very specific transformations but it can quickly get slow. Using a generic XSLT file for XML to JSON transformation is easy and convenient but won’t be the most efficient option.

We can also notice that the Groovy option is a little bit less efficient than the Java one, but that’s expected. Nevertheless, the Groovy option provides pretty good performances and does not require building and compiling a custom processor: everything can be done directly from the NiFi UI.

To improve the performances, it’s then possible to play with the “run duration” parameter and increase the number of concurrent tasks. Actually it’s quite easy to reach the I/O limitations of the disks. Using a NiFi cluster and multiple disks for the content repository, it’s really easy to process hundreds of millions of XML documents per day.

If we display the performance ratio based on the file size between the XSLT solution and the Java based solution, we have:

Screen Shot 2017-09-07 at 10.28.46 PM

We can see that with very small files, the processing using Java-based processor is about 13x more efficient than the XSLT approach. But with files over 100KB, the Java solution is about 26x more efficient. That’s because the NiFi framework is doing few things before and after a flow file has been processed. When processing thousands of flow files per second it creates a small overhead that explains the difference.

XML Record Reader

Since few versions, Apache NiFi contains record-oriented processors. It provides very powerful means to process record-oriented data. In particular, it allows users to process batches of data instead of a “per-file” processing. This provides a very robust and high rate processing. While I’m writing this post there is no reader for XML data yet. However there is a JIRA for it and it would provide few interesting features:

  • By using a schema describing the XML data, it’d remove the need to use ReplaceText processors to handle the “array problem”.
  • It’d give the possibility to merge XML documents together to process much more data at once providing even better performances.

This effort can be tracked under NIFI-4366.

As usual, feel free to post any comment/question/feedback.

Monitoring NiFi – Scripted Reporting Task

Note – This article is part of a series discussing subjects around NiFi monitoring.

In the new release of Apache NiFi (1.2.0), you can now develop Scripted Reporting Task thanks to NIFI-1458. It is the same approach as with the ExecuteScript processor for which you have tons of great examples here.

You might also want to read the following posts:

With the ScriptedReportingTask you can define your own implementation of the onTrigger() method and get access to:

  • ReportingContext context (which gives you access to various information such as events, provenance, bulletins, controller services, process groups, etc)
  • VirtualMachineMetrics vmMetrics (to access the metrics of the JVM)
  • ComponentLog log (if you want to log messages)

Let’s start with a very easy example: I want to log the number of threads inside my JVM every minute. Here is my code:

log.info("Thread count = " + vmMetrics.daemonThreadCount())

Screen Shot 2017-05-12 at 5.43.45 PM.png

And I can check in my nifi-app.log file that I do have:

2017-05-12 17:43:27,639 INFO [Timer-Driven Process Thread-5] o.a.n.r.script.ScriptedReportingTask ScriptedReportingTask[id=fd1668eb-015b-1000-1974-5ef96e1f9a8b] Thread count = 29

OK… now I won’t go into the details of all the information you can access using the “context” variable but let’s try another example…

I want to send a POST request over HTTP containing a JSON representation of the summary of my root process group.

I’m not really used to Groovy so please excuse my coding style ;-). But here is a working code (available here as well):

def json = Class.forName("javax.json.Json")
def httpClients = Class.forName("org.apache.http.impl.client.HttpClients")
def contentType = Class.forName("org.apache.http.entity.ContentType")

def status = context.getEventAccess().getControllerStatus();
def factory = json.createBuilderFactory(Collections.emptyMap());
def builder = factory.createObjectBuilder();

builder.add("componentId", status.getId());
builder.add("bytesRead", status.getBytesRead());
builder.add("bytesWritten", status.getBytesWritten());
builder.add("bytesReceived", status.getBytesReceived());
builder.add("bytesSent", status.getBytesSent());
builder.add("bytesTransferred", status.getBytesTransferred());
builder.add("flowFilesReceived", status.getFlowFilesReceived());
builder.add("flowFilesSent", status.getFlowFilesSent());
builder.add("flowFilesTransferred", status.getFlowFilesTransferred());
builder.add("inputContentSize", status.getInputContentSize());
builder.add("inputCount", status.getInputCount());
builder.add("outputContentSize", status.getOutputContentSize());
builder.add("outputCount", status.getOutputCount());
builder.add("queuedContentSize", status.getQueuedContentSize());
builder.add("activeThreadCount", status.getActiveThreadCount());
builder.add("queuedCount", status.getQueuedCount());

def requestEntity = new org.apache.http.entity.StringEntity(builder.build().toString(), contentType.APPLICATION_JSON);
def httpclient = httpClients.createDefault();
def postMethod = new org.apache.http.client.methods.HttpPost("http://localhost:9999/rootStatus");
postMethod.setEntity(requestEntity);
httpclient.execute(postMethod);
httpclient.close();

Note – this should be improved to, for instance, properly handle potential exceptions.

To get this code working, I also have to specify the required dependencies (JSON, Apache HTTP, etc). For that, in the module directory property of the reporting task, I gave the following paths (because I’m lazy, I am pointing to much dependencies than required):

  • /var/lib/nifi/work/nar/extensions/nifi-standard-nar-1.2.0.nar-unpacked/META-INF/bundled-dependencies/
  • /var/lib/nifi/work/nar/extensions/nifi-site-to-site-reporting-nar-1.2.0.nar-unpacked/META-INF/bundled-dependencies/

In my example I’m sending my JSON payload with a POST HTTP request to localhost on port 9999 with the path rootStatus. To receive the request, I started a ListenHttp processor with the following configuration:

Screen Shot 2017-05-12 at 8.00.31 PM.png

Once my reporting task is started, I start receiving the information as flow files:

Screen Shot 2017-05-12 at 8.08.55 PM.png

This scripted reporting task allows you to quickly develop proof of concept to send information to your internal systems using the interfaces you want. However, according to your needs, it might be more interesting to develop your own reporting task in Java and to build the corresponding NAR. It will give you more flexibility/options (you’ll be able to implement more interfaces) and better performances.

As usual feel free to ask questions and comment this post.

Monitoring NiFi – Ambari & Grafana

Note – This article is part of a series discussing subjects around NiFi monitoring.

When using Apache NiFi (note that version 1.2.0 is now released!) as part of HDF, a lot of of things are simplified using Apache Ambari to deploy NiFi and manage its configuration. Also, using Ambari Metrics service and Grafana, you have a way to easily and visually monitor NiFi performances. And you can also use Apache Ranger to centralize the authorizations management for multiple components (NiFi, Kafka, etc) in one single place.

This article will discuss how you can use Ambari Metrics and Grafana to improve your NiFi monitoring. Let’s start with a quick discussion around AMS (Ambari Metrics System). By default this service is running a Metrics Collector with an embedded HBase instance (and a Zookeeper instance) to store all the metrics, and Ambari will also deploy Metrics Monitor instances on all the nodes of the cluster. The monitors will collect the metrics at system level and send the metrics to the collector. However, the collector also exposes a REST API and that’s what NiFi is going to use with the AmbariReportingTask.

GrafanaBlogOverview

Source and documentation is on the Hortonworks website here.

When using HDF, the Ambari Reporting task should be already up and running for you. If not, you can add it and configure it with a frequency of one minute (it does matter) and use the following parameters:

Screen Shot 2017-05-11 at 9.30.46 AM

Note that “ambari.metrics.collector.url” is an environment variable already set for you when Ambari is starting NiFi. You could also directly give the address, in my case:

http://pvillard-hdf-1:6188/ws/v1/timeline/metrics

Once this reporting task is up and running, you should be able to see the metrics on the NiFi service page in Ambari:

Screen Shot 2017-05-11 at 9.38.57 AM.png

Also, you can go into Grafana to display dashboards with the metrics of your components. You have pre-configured dashboards and here is the one for NiFi:

Screen Shot 2017-05-11 at 9.46.03 AM.png

Now, all the metrics we have here are at cluster level. We are not able to display metrics for specific workflows. With the latest release of Apache NiFi (1.2.0), there is now an optional parameter in the AmbariReportingTask to specify a process group ID. This way, by creating a second reporting task (keep the one providing cluster-level metrics) and by specifying the ID of a specific process group, you can actually create your Grafana dashboards at workflow level.

Let’s say I’ve the following workflow:

Screen Shot 2017-05-11 at 9.52.49 AM

And inside my process group, I have:

Screen Shot 2017-05-11 at 9.52.59 AM.png

Now, my process group having the ID “75973b6e-2d38-1cf3-ffff-fffffdea8cbc”, I can define the following Ambari reporting task:

Screen Shot 2017-05-11 at 9.54.50 AM.png

Note – you must keep “nifi” as the Application ID as it has to match the configuration of the Ambari Metrics System.

Once your reporting task is running, in Grafana, you can create your own dashboard for this workflow and display the metrics you want:

Screen Shot 2017-05-11 at 10.08.59 AM.png

For my Kafka example, here is the dashboard I defined:

Screen Shot 2017-05-11 at 10.39.47 AM.png

In this example, I can see that my workflow is running fine but the free disk space on one of my node is decreasing very quickly. It turns out that when my disk is completely filled, back pressure will be enabled in my workflow and there is no more data sent to Kafka. Instead data is queued in NiFi.

This simple example gives me a lot of information:

  • Everything is default configuration in Ambari and I chose my three NiFi nodes to also host Kafka brokers. By default, for Kafka, the replication factor is set to 1, the number of partitions is set to 1 and the automatic creation of topic is allowed (that’s why I didn’t need to create the topic before starting my workflow). Because of the default parameters, all of the data is sent to only one Kafka broker (pvillard-hdf-2) and that’s why the disk space is quickly decreasing on this node since my three NiFi nodes are sending data to this broker.
  • Also, we clearly see that’s not a good idea to collocate NiFi and Kafka on the same nodes since they are both IO intensive. In this case, they are using the same disk… and we can see that the task duration (for NiFi) is clearly higher on the Kafka node that is receiving the data (pvillard-hdf-2). Long story short: keep NiFi and Kafka on separated nodes (or at the very least with different disks).

With HDF and the Ambari Metrics System, it gives you the ability to create custom relevant dashboards for specific use cases. It also allows you to mix information from Kafka, from NiFi and from the hosts to have all the needed information in one single place.

Also, by using the REST API of the Metrics Collector (you may be interested by this article), you could also send your own data (not only the data gathered at the process group level) to add more information into your dashboards. An example that comes in mind would be to send the lineage duration (see Monitoring of Workflow SLA) at the end of the workflow using an InvokeHTTP processor and sending a JSON payload using a POST request to the API endpoint.

Let’s say I want to monitor how long it takes between my GenerateFlowFile and the end of my workflow to check if some particular events are taking longer. Then I could have something like:

Screen Shot 2017-05-11 at 5.58.02 PM.png

What am I doing here? I want to send to AMS the information about the lineage duration of the flow files I sent into my Kafka topic. However I don’t want to send the duration of every single event (that’s not really useful and it’s going to generate a lot of requests/data). Instead I want to make an API call only once per minute. The idea is to compute the mean and max of the lineage duration with a rolling window of one minute and to only send this value to AMS.

I could use the new AttributeRollingWindow processor but it is not as fast as the PublishKafka and I don’t want to generate back pressure in my relationships. So I use the InvokeScriptedProcessor to build my own rolling processor (it’s faster because I am not using any state information):

  • this processor takes a frequency duration as a parameter (that I’ll set to 1 minute in this example)
  • for every flow file coming in, it will extract the lineage start date to compute max and mean lineage duration over the rolling window. If the last flow file sent in the success relationship was less than one minute ago, I’ll route the flow file to drop relationship (that I set to auto-terminated). If it was more than one minute ago, I update the attributes of the current flow file with the mean and max of all the flow files since the last “success” flow file and route this flow file in the success relationship

Since I’ve flow files coming in my processor at a high rate, I know that my processor will release one flow file every minute with the mean and max of the linage duration for the flow files of the last minute.

Then I use a ReplaceText processor to construct the JSON payload that I’ll send to the Metrics Collector using the InvokeHttp processor.

Here is the configuration of the InvokeScriptedProcessor:

Screen Shot 2017-05-11 at 7.54.00 PM

The Groovy script used can be found here.

Then I create the JSON payload with the ReplaceText processor:

Screen Shot 2017-05-11 at 7.56.33 PM.png

Note that I use the ID of the processor as the “instanceid” attribute in the JSON.

Then, I use the InvokeHttp processor (with a scheduling/frequency of 1 minute):

Screen Shot 2017-05-11 at 7.57.45 PM.png

Now, I can use this information to build the corresponding graph in my Grafana dashboard:

Screen Shot 2017-05-11 at 7.59.27 PM

I can see that, in average, it takes about 150 milliseconds to generate my flow file, publish it in my Kafka topic and get it into my scripted processor. I could also generate one metric per host of my cluster to check if a node is performing badly compared to the others.

Now you can easily send your custom data into AMS and create dashboards for your specific use cases and workflows.

As usual feel free to ask questions and comment this post.

Monitoring NiFi – Workflow SLA

Note – This article is part of a series discussing subjects around NiFi monitoring.

Depending of the workflows and use cases, you may want to retrieve some metrics per use-case/workflow instead of high level metrics. One of the things you could be looking after is Workflow SLA.

First of all, we need to agree on what is “Workflow SLA”. In this article, by SLA (Service-Level Agreement) monitoring I mean that I want to ensure that a workflow is processing every single event quickly enough. In other words, if an event took a long time to be processed, I want to be aware of that so I can understand what is going on. A workflow could be perfectly running in NiFi (no bulletin generated, no error) but the processing time could get bigger because of external causes (resources exhaustion, network bandwidth with external systems, abnormal event size, etc).

Luckily for us, every single flow file within NiFi contains two core attributes that are perfectly designed for our needs. From the documentation:

  • entryDate: The date and time at which the FlowFile entered the system (i.e., was created). The value of this attribute is a number that represents the number of milliseconds since midnight, Jan. 1, 1970 (UTC).
  • lineageStartDate: Any time that a FlowFile is cloned, merged, or split, this results in a “child” FlowFile being created. As those children are then cloned, merged, or split, a chain of ancestors is built. This value represents the date and time at which the oldest ancestor entered the system. Another way to think about this is that this attribute represents the latency of the FlowFile through the system. The value is a number that represents the number of milliseconds since midnight, Jan. 1, 1970 (UTC).

In our case we are really interested by the “lineageStartDate” attribute but based on your specific use cases and needs you could also be interested by “entryDate”.

At the end of my workflow I want to monitor, I could use a RouteOnAttribute processor to route the event if and only if the duration since the lineage start date is over a given threshold (let’s say one minute for the below example). And if an event took longer than expected, I’m routing the flow file to a PutEmail processor to send an email and receive a notification.

Screen Shot 2017-05-10 at 3.33.34 PM.png

My RouteOnAttribute configuration:

Screen Shot 2017-05-10 at 3.34.02 PM.png

This way, at a workflow level, you are able to check if the processing time of an event is meeting your requirements. You could obviously follow the same approach to check the processing time at multiple critical points of the workflow.

Note that, instead of sending an email notification, you could perfectly use any other kind of processor to integrate this alert/event with any system you are using on your side.

One last comment, I recommend you to check out the article about the Ambari Reporting Task and Grafana where I’m also discussing Workflow SLA and how to send the metrics to Ambari Metrics System and display the information into a Grafana dashboard.

As usual feel free to ask questions and comment this post.

Monitoring NiFi – Site2Site reporting tasks

Note – This article is part of a series discussing subjects around NiFi monitoring.

If we look at the development documentation about reporting tasks:

So far, we have mentioned little about how to convey to the outside world how NiFi and its components are performing. Is the system able to keep up with the incoming data rate? How much more can the system handle? How much data is processed at the peak time of day versus the least busy time of day?

In order to answer these questions, and many more, NiFi provides a capability for reporting status, statistics, metrics, and monitoring information to external services by means of the ReportingTask interface. ReportingTasks are given access to a host of information to determine how the system is performing.

Out of the box, you have quite a lot of available reporting tasks and, in this article, we are going to focus on few of them (have a look at the other articles to find more about the other reporting tasks).

Before going into the details, I am going to discuss Site To Site related reporting tasks and we need to understand what is Site To Site (S2S):

When sending data from one instance of NiFi to another, there are many different protocols that can be used. The preferred protocol, though, is the NiFi Site-to-Site Protocol. Site-to-Site makes it easy to securely and efficiently transfer data to/from nodes in one NiFi instance or data producing application to nodes in another NiFi instance or other consuming application.

In this case, we are going to use S2S reporting tasks, it means that the reporting tasks are continually running to collect data and to send this data to a remote NiFi instance, but you can use S2S to send data to the instance running the reporting task as well. This way, by using an input port on the canvas, you can actually receive the data generated by the reporting task and use it in a NiFi workflow. That’s really powerful because, now, you can use all the NiFi capabilities to process this data and do whatever you want with it.

Let’s go over some examples!

  • Monitoring bulletins

With the new version of Apache NiFi (1.2.0), you can transform every bulletin into a flow file sent to any NiFi instance using Site-To-Site. This way, as soon as a processor / controller service / reporting task is generating a bulletin this can be converted into a flow file that you can use to feed any system you want.

Let’s configure this reporting task to send the bulletins (as flow files) to an input port called “bulletinMonitoring” and use the flow files to send emails.

First, since my NiFi cluster is secured, I create a StandardSSLContextService in the Controller Services tab of the Controller Settings menu (this way, it can be used by reporting tasks).

Screen Shot 2017-05-10 at 12.28.54 PM.png

Screen Shot 2017-05-10 at 12.28.46 PM.png

Then, I can define my reporting task by adding a new SiteToSiteBulletinReportingTask:

Screen Shot 2017-05-10 at 12.30.55 PM.png

Screen Shot 2017-05-10 at 12.31.29 PM.png

Before starting the task, on my canvas, I have the following:

Screen Shot 2017-05-10 at 12.32.55 PM.png

Note – in a secured environment, you need to set the correct permissions on the components. You need to allow NiFi nodes to receive data via site-to-site on the input port and you also need to grant the correct permissions on the root process group so the nodes are able to see the component, view and modify the data.

I configured my PutEmail processor to send emails using the Gmail SMTP server:

Screen Shot 2017-05-10 at 12.45.13 PM.png

Now, as soon as bulletins are generated I’ll receive a notification by email containing my message with the attributes of the flow file, and there will the bulletins as attachment of my email.

Obviously, instead of sending emails, you could, for example, use some other processors to automatically open tickets in your ticketing system (like JIRA using REST API).

  • Monitoring disk space

Now, using the task we previously set, we can take advantage of the task monitoring disk space. This reporting task will generate warn logs (in the NiFi log file) and bulletins when the disk partition to monitor is used over a custom threshold. In case I want to monitor the Content Repository, I could configure my reporting task as below:

Screen Shot 2017-05-10 at 1.28.24 PM.png

Screen Shot 2017-05-10 at 1.28.34 PM.png

Using the combination of this Reporting Task and the SiteToSiteBulletinReportingTask, I’m able to generate flow files when the disk utilization is reaching a critical point and to receive notifications using all the processors I want.

  • Monitoring memory use

The same approach can be used to monitor the memory utilization within the NiFi JVM using the MonitorMemory reporting task. Have a look at the documentation of this reporting task here.

  • Monitoring back pressure on connections

There is also the SiteToSiteStatusReportingTask that will send details about the NiFi Controller status over Site-to-Site. This can be particularly useful to be notified when some processors are stopped, queues are full (and back pressure is enabled), or to build reports regarding NiFi performances. This reporting task will slightly be improved regarding back pressure (with NIFI-3791). In the meantime, if you want to receive notifications when back pressure is enabled on any connection, here is what you can do (assuming you know the back pressure thresholds):

Screen Shot 2017-05-10 at 2.07.44 PM.png

Screen Shot 2017-05-10 at 2.09.14 PM.png

Note that I configured the task to only send data about the connections but you can receive information for any kind of component.

And I use the following workflow: my input port to receive the flow files with the controller status (containing an array of JSON elements for all of my connections), then I split my array using SplitJson processor, then I use EvaluateJsonPath to extract as attributes the values queuedCount and queuedBytes and then I use a RouteOnAttribute processor to check if one of the two attributes I have is greater or equal than my thresholds, and if that’s the case I send the notification by email.

Screen Shot 2017-05-10 at 2.31.23 PM.png

My RouteOnAttribute configuration:

Screen Shot 2017-05-10 at 3.33.46 PM

Site to site reporting tasks are really useful and there are many ways to use the data they can send for monitoring purpose.

  • SiteToSiteProvenanceReportingTask

Note that you have also a Site2Site reporting task to send all the provenance events over S2S. This can be really useful if you want to send this information to external system.

While monitoring a dataflow, users often need a way to determine what happened to a particular data object (FlowFile). NiFi’s Data Provenance page provides that information. Because NiFi records and indexes data provenance details as objects flow through the system, users may perform searches, conduct troubleshooting and evaluate things like dataflow compliance and optimization in real time. By default, NiFi updates this information every five minutes, but that is configurable.

Besides, with NIFI-3859, this could also be used in a monitoring approach to only look for specific events according to custom parameters. It could be used, for instance, to check how long it takes for an event to go through NiFi and raise alerts in case an event took an unusual duration to be processed (have a look to the next article to see how this can be done differently).

As usual feel free to ask questions and comment this post.

Monitoring NiFi – Logback configuration

Note – This article is part of a series discussing subjects around NiFi monitoring.

There is one configuration file in NiFi that manages all the logging operations performed by NiFi: this file is in the configuration directory (NIFI_HOME/conf) and is named logback.xml. It is good to know that this file can be modified “on-the-fly” and it won’t be required to restart NiFi for the modifications to be taken into account.

This file is a common configuration file for the logback library which is a successor of the famous log4j project. Here is the official website. I won’t get into the details of logback itself (documentation is here) but here is a quick overview of the default configuration when using NiFi.

The most important log file in NiFi is certainly nifi-app.log which is created according to this configuration block:

    <appender name="APP_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <file>${org.apache.nifi.bootstrap.config.log.dir}/nifi-app.log</file>
        <rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
            <fileNamePattern>${org.apache.nifi.bootstrap.config.log.dir}/nifi-app_%d{yyyy-MM-dd_HH}.%i.log</fileNamePattern>
            <maxFileSize>100MB</maxFileSize>
            <maxHistory>30</maxHistory>
        </rollingPolicy>
        <immediateFlush>true</immediateFlush>
        <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
            <pattern>%date %level [%thread] %logger{40} %msg%n</pattern>
        </encoder>
    </appender>

    <root level="INFO">
        <appender-ref ref="APP_FILE"/>
    </root>

In this case, log file is rolling every hour or when the file is going over 100MB. Besides we only keep up to 30 files worth of history. Based on your retention policies you can update this file to meet your requirements.

Also note that, by default, you will have the following log files:

  • ./logs/nifi-app.log
  • ./logs/nifi-bootstrap.log
  • ./logs/nifi-user.log

Now… what can we do to use this file for monitoring purpose? Some options here:

  • We can use the TailFile processor of NiFi to let NiFi tails its own log file and perform the required operation when some conditions are met. Example to send an HTTP request when logs of WARN and ERROR levels are detected:

Screen Shot 2017-05-02 at 9.05.14 PM.png

  • We can also define new appenders in the log configuration file and change it according to our needs. In particular, we could be interested by the SMTP Appender that can send logs via emails based on quite a large set of conditions. Full documentation here.
  • Obviously you can also configure this configuration file so that NiFi log files integrate with your existing systems. An idea could be to configure a Syslog appender to also redirect the logs to an external system.

Few remarks regarding the logs as I see recurrent questions:

  • At the moment, there is no option to have a log file per processor or per process group. It is discussed by NIFI-3065. However such an approach could have performance implications since each log message would need to be routed to the correct log file.
  • Also, at the moment, the custom name of the processor is not displayed in the log messages (only the type is used). It is discussed by NIFI-3877. Right now it looks like:

2017-05-12 10:43:37,780 INFO [Timer-Driven Process Thread-4] o.a.nifi.processors.standard.InvokeHTTP InvokeHTTP[id=f7ebb153-015b-1000-f590-599786e16340] my log message…

This could be a solution in a multi-tenant environment (to get one log file per workflow/use case) assuming each “team” is following the same convention to name the components in the workflows.

As usual feel free to ask questions and comment this post.