NiFi 1.7+ – Terminate threads

One of the new features coming with NiFi 1.7.0 is the possibility to terminate stuck threads directly from the UI. Before this release, when you had a processor getting stuck (like a custom processor with a deadlock) you had no option but to restart NiFi… and that’s not really great in a multi-tenant setup.

Let’s see this new feature with an example: I’m using a GenerateFlowFile and, using the debug mode of my IDE, I’m going to simulate an issue with the thread by adding a break point in the onTrigger() method of the processor.

Screen Shot 2018-07-02 at 2.56.57 PM.png

When the processor is running, we can see the number of threads currently used by the processor (top right). If the number of tasks is not increasing (and equal to 0 after 5 minutes) and the number of threads is constant, it probably means you have a stuck thread. In this case, it is always recommended to do a thread dump of NiFi to see what is going on. To do that:

./bin/nifi.sh dump /tmp/thread-dump.txt

If we look at the content of the generated file and looking for my GenerateFlowFile processor, we can see something like:

"Timer-Driven Process Thread-5" Id=57 RUNNABLE (suspended)
 at org.apache.nifi.processors.standard.GenerateFlowFile.onTrigger(GenerateFlowFile.java:210)
 at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
 at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1165)
 at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:203)
 at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
 at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
 at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
 at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745)
 Number of Locked Synchronizers: 1
 - java.util.concurrent.ThreadPoolExecutor$Worker@2388307

In this case, the thread is shown as “suspended” at the line where I put my break point in my IDE.

Note that having a stuck thread is usually a symptom of a badly designed processor or an underlying issue with the code dependencies of the processor. Having the thread dump can help locating and fixing the issue.

If trying to stop the processor:

Screen Shot 2018-07-02 at 3.02.23 PM.png

I’ll now see two threads being used by the processor:

Screen Shot 2018-07-02 at 3.03.09 PM

The processor is now in the process of being stopped but you won’t be able to update or restart the processor until it is actually stopped. However, if the initial thread is stuck, you’ll remain in this state forever (and, eventually, you’ll have to restart NiFi if you’re running a version below 1.7.0).

With NiFi 1.7.0, you now have the possibility to terminate the thread (meaning you don’t have to wait for the thread to be completely stopped):

Screen Shot 2018-07-02 at 3.06.58 PM

If I terminate the thread, here is what I’ll see:

Screen Shot 2018-07-02 at 3.08.07 PM

Meaning there is no more active thread and there is one thread being terminated.

Under the hood, the framework is issuing an “interrupt” for the thread and will perform a “reload” of the processor meaning there is a new instance of the processor class being created. This allows the old class to eventually shut down gracefully, close connections, etc. However this needs to be used with care: if the class is maintaining some values in internal variables, this information will be lost in the process (I’m not talking about state information that can be saved at framework level by the processors).

An interrupt is an indication to a thread that it should stop what it is doing and do something else. It’s up to the programmer to decide exactly how a thread responds to an interrupt, but it is very common for the thread to terminate. However if, for some reasons, the thread does not respond to interrupt, then you will keep the thread as being terminated forever (in my above example, I can see the thread staying in the state of being terminated because of my break point).

You can now update the configuration and start again the processor (even if a thread is being terminated). Once the thread is terminated, you’ll get back to a nominal situation:

Screen Shot 2018-07-02 at 4.54.51 PM.png

If you don’t want to setup a debug environment, you can simulate a stuck thread with the following ExecuteScript processor:

Screen Shot 2018-07-02 at 4.56.49 PM.png

If starting/stopping the processor:

Screen Shot 2018-07-02 at 6.02.37 PM.png

And I’ll get the following message when terminating the thread:

Screen Shot 2018-07-02 at 6.02.58 PM.png

You also can use the DebugFlow processor with “@OnStopped Pause Time” set to something like “5 min” and “Ignore Interrupts When Paused” set to “true”:

Screen Shot 2018-07-02 at 6.49.29 PM.png

That’s it for this post! This new feature is really nice in a multi-tenant environment where you can’t afford a restart of the service. If you’re in a situation where you need to use this feature, remember to take a thread dump before actually terminating the thread. This will be really useful to investigate the issue. Also, remember that terminating a thread is not a “normal” operation, it means there is something wrong somewhere and it could very likely happen again.

Thanks for reading this post and, as always, feel free to comment / ask questions!

NiFi 1.7+ – XML Reader/Writer and ForkRecord processor

Starting with NiFi 1.7.0 and thanks to the work done by Johannes Peter on NIFI-4185 and NIFI-5113, it’s now possible to use an XML reader and writer in the Record processors to help you processing XML data. Before that, you had few options requiring a bit of additional work to get things working (see here).

I won’t go into the details because the reader/writer are really well documented (have a look at the additional details for examples):

Also, with NIFI-4227, a ForkRecord processor is now available to “explode” your data when you have one or multiple embedded arrays in your data set and you need to normalize your data. Have a look at the ForkRecord documentation (additional details) for examples.

Example – Introduction

Let’s go through an example using the XML Reader and the ForkRecord processor. I assume I’m receiving XML data with the following schema:

Screen Shot 2018-06-27 at 10.39.20 AM.png

And here is a dummy file I’m receiving that I’ll use for this example:

Screen Shot 2018-06-27 at 2.15.28 PM.png

The corresponding Avro schema can be found here. Defining the Avro schema corresponding to your data is the most “difficult” part but once it’s done, everything else is really easy with the Record processors. You can get some help using the InferAvroSchema processor but this should only be used to get an initial version of your Avro schema when you start developing your workflows (it should not be used as a processor used in your production workflows).

I’m going to quickly explain the following workflow (template available here):

Screen Shot 2018-06-27 at 2.24.43 PM.png

Example – XML to JSON conversion

The GenerateFlowFile is used to generate my XML data and to send the content to the Record processors I’m using. The upper part of the workflow is just a ConvertRecord processor to perform the XML to JSON conversion thanks to the schema. The other parts of the workflows are used to extract each “object” of my schema (customer, address, account and transaction).

Here are the configuration details for the XML to JSON conversion. The GenerateFlowFile is very simple:

Screen Shot 2018-06-28 at 2.25.23 PM

Note the attribute ‘schema.name’ set with the value of the schema I added in my Avro schema registry controller service:

Screen Shot 2018-06-28 at 2.26.24 PM.png

The XML Reader is configured to get the schema name from the ‘schema.name’ attribute of the processed flow files:

Screen Shot 2018-06-28 at 2.27.21 PM.png

The JSON writer does not contain any specific configuration, it’ll write the data using the schema inherited at the reader level:

Screen Shot 2018-06-28 at 2.28.22 PM

The ConvertRecord processor can now be configured for the XML to JSON conversion:

Screen Shot 2018-06-28 at 2.29.20 PM.png

That’s it! You now have a very easy and powerful way to perform your XML to JSON conversion. Here the flow file content before the processor:

Screen Shot 2018-06-27 at 2.15.28 PM

Here is the JSON generated by the processor:

Screen Shot 2018-06-28 at 2.31.23 PM

If you need to manipulate your records to go from one schema to another, you can use the UpdateRecord processor.

Note – using the XML Reader/Writer and Record processors, you can expect performances as good as the best options presented in my previous post about XML data processing.

Example – ForkRecord processor

The goal here is to extract the data contained in the arrays into separated CSV files. From one XML file, I want to generate 4 CSV files that I could use, for instance, to send data into database tables.

Let’s start with the first one regarding customer data. I’m adding a ‘customer’ Avro schema in the Avro schema registry:

Screen Shot 2018-06-28 at 2.42.58 PM.png

I’m using an UpdateAttribute, to set the name of the ‘output.schema’ that I want to be used by the CSV Record Writer. Then, I’m just using a ConvertRecord processor to process from XML to JSON and I only keep the root level fields (customer data). Here is the configuration of my CSV writer:

Screen Shot 2018-06-28 at 2.45.23 PM.png

Here is the output when processing my XML data:

Screen Shot 2018-06-28 at 2.46.32 PM.png

Now, I want to extract data contained in arrays and that’s where the ForkRecord is useful. To use it, I need to specify a custom property containing the Record path to the array that the processor should process, then you have two modes for the processor:

  • Split – that will preserve the input schema but will create one flow file per element contained in the array.
  • Extract – that will generate flow files using the element of the arrays with the possibility to include the parent fields up to the root level in case you want to keep some fields as keys/identifiers of your arrays elements. Note that you can define which parent fields you want to preserve by setting the appropriate schema with only the fields you want.

In our case we’re using the Extract mode and we are including the parent fields: for the ‘address’ data for instance, we want to keep the customer_id field in the output (that field would be used as a key between the customer data and the address data in the database world). Here is the schema I define in my Avro schema registry for the ‘address’ object:

Screen Shot 2018-06-28 at 2.52.03 PM

I’m using an UpdateAttribute to set the attributes I’m using in my reader and the ForkRecord processor:

Screen Shot 2018-06-28 at 2.55.04 PM

And I’m configuring the ForkRecord processor to use the ‘fork.path’ attribute defining the array of elements to be processed:

Screen Shot 2018-06-28 at 2.55.59 PM.png

Note – if you have multiple Record paths pointing to multiple arrays with elements using the same output schema, you can define as many custom properties as you want.

And here is the output after the ForkRecord processor:

Screen Shot 2018-06-28 at 2.56.40 PM

Here are the Record paths I’m setting for the other type of objects I want to extract:

  • Account – fork.path = /accounts/account
  • Transaction – fork.path = /accounts/account[*]/transactions/transaction
    In this case, I need to specify account[*] because I want to access all the transactions of all the elements in the account array. But I could use the Record path features to only access some specific accounts based on some predicates.

And here the output I get (after defining the appropriate schema in my schema registry):

  • Account data (I only keep the customer_id from the parent fields):

Screen Shot 2018-06-28 at 3.02.14 PM.png

  • Transaction data (I only keep the customer_id and account_id from the parent fields):

Screen Shot 2018-06-28 at 3.03.55 PM

Conclusion

The XML Reader & Writer are a really nice addition to NiFi if you need to process XML data and it will make your existing workflows much simpler! You really have to look at the Record processors and concepts as soon as you’ve a use case manipulating data complying to a schema.

The ForkRecord processor is a new processor that you can use if you need to manipulate schema oriented data containing a lot of arrays. That can be useful in case you need to normalize data before ingestion into databases.

Thanks for reading this post and, as always, feel free to comment and/or ask questions!

NiFi workflow monitoring – Wait/Notify pattern with split and merge

Thanks to NIFI-4262 and NIFI-5293, NiFi 1.7.0 contains a small improvement allowing users to extend the Wait/Notify pattern to merging situations.

If you’re not familiar with the Wait/Notify concept in NiFi, I strongly recommend you to read this great post from Koji about the Wait/Notify pattern (it’ll be much easier to understand this post).

When using the Merge* processors, you have one relationship to route the flow file containing the merged data and one relationship to route the original flow files used in the merging operation. With NIFI-4262 for the MergeContent processor (and NIFI-5293 for the MergeRecord processor), the original flow files used in the merge operation will now contain an attribute with the UUID of the flow file generated by the merge operation. This way, you now have a way to link things together and that’s really useful to leverage the Wait/Notify pattern.

Use case presentation

Let’s demonstrate the feature with a use case: I’m receiving ZIP files containing multiple CSV files that I want to merge together while converting the data into Avro and send it into a file system (for simplicity here, I’ll send the data to my local file system, but it could be HDFS for instance).

The idea is the following:

ListFile -> FetchFile -> UnpackContent -> MergeRecord -> PutFile

That’s easy and it works great… BUT… I want to be able to monitor and track exactly how each ZIP file is handled and when the data contained in a ZIP file is stored in the destination. I have a requirement to maintain in real-time the following tables:

Table ‘zip_files_processing’

  • zip_file_name – unique name of the received ZIP files
  • ff_uuid – flow file UUID (useful to replay events in case of errors)
  • ingestion_date – date when the zip file starts being processing in NiFi
  • storage_date – date when all the data of the ZIP file has been processed
  • number_files – number of CSV files contained in the ZIP file
  • number_files_OK – number of files successfully stored in the destination
  • number_files_KO – number of files unsuccessfully processed
  • status – current status of the process for the ZIP file (IN_PROGRESS, OK, KO)

Table ‘files_ingestion’

  • zip_file_name – unique name of the zip file containing the CSV file
  • file_name – name of the CSV file
  • ff_uuid – flow file UUID (useful to replay events in case of errors)
  • storage_date – date when the CSV file has been processed
  • storage_name – name of the destination file containing the data
  • status – current status of the process for the CSV file (IN_PROGRESS, OK, KO)

That’s where the Wait/Notify is going to be extremely useful.

High level description of the workflow

Here is the main idea behind the workflow:

Screen Shot 2018-06-13 at 4.14.17 PM.png

Let’s describe the workflow at a high level:

  • List the data available and insert rows in my zip_files_processing table for each listed file so that I can track ingested data
  • Fetch the data (in case of error, I update the zip_files_processing table to set the status to KO)
  • I unpack my ZIP files to get a flow file per CSV file, and I route the original flow file to a Wait processor. The flow file representing the ZIP file will be released only once all the associated CSV files will be fully processed
  • For each CSV file, I insert a row in my files_ingestion table to track the status of each CSV file being processed
  • I merge the data together while converting the data from CSV to Avro, and I route the original flow files to a Wait processor. The flow files will be released only once the merged flow file will be sent to the final destination
  • I send the merged flow file to the destination (local file system in this case) and then use the Notify processor to release all the original flow files used to create the merged flow file
  • Then I use the released flow files to notify the first Wait processor and also release the flow file representing the corresponding ZIP file
  • With all the released flow files, I can update my monitoring tables with the appropriate information

Workflow details

Here is the full configuration of the workflow and some explanations around the parameters I used. Also, here is the template of the workflow I used – it’ll be easier to understand if you have a look, give it a try and play around with it (even without the SQL processors) – note that you will need to add a Distributed Map Cache server controller service to have the Distributed Map Cache clients working with Wait/Notify processors:

Screen Shot 2018-06-26 at 1.50.35 PM.png

Let’s start describing the ingestion part:

Screen Shot 2018-06-26 at 1.52.57 PM.png

Nothing very strange here. Just note the UpdateAttribute processor I’m using to store the original UUID to a dedicated attribute so that I don’t loose it with the UnpackContent processor (that creates new flow files with new UUIDs).

Screen Shot 2018-06-26 at 1.56.13 PM

The first PutSQL is used to insert new lines in my ZIP monitoring table for each ZIP file I’m listing. I’m executing the below query:

Screen Shot 2018-06-26 at 1.55.31 PM

The second PutSQL is used in case of error when fetching the file from the local file system or when an error occurs while unpacking the archive. In such a case, I’m executing the below query:

Screen Shot 2018-06-26 at 1.57.24 PM.png

Let’s now focus on the second part of the workflow:

Screen Shot 2018-06-26 at 1.58.28 PM

The UnpackContent processor is used to extract the CSV files from the ZIP file. For one flow file representing the ZIP file, it will generate one flow file per CSV files contained in the ZIP file. The original flow file will then be routed in the original relationship while flow files for CSV data will be routed in the success relationship. The flow files containing the CSV data will have a ‘fragment.identifier’ attribute and that’s the common attribute between the original flow file and the generated ones that I’m going to use for Wait/Notify (this attribute is automatically generated by the processor). Besides the original flow file also has a ‘fragment.count’ attribute containing the number of generated flow files for the CSV data.

The Wait processor is configured as below:

Screen Shot 2018-06-26 at 2.03.46 PM

Basically I’m setting the identifier of the release signal to the common attribute shared between the original and the generated flow files, and I’m saying that I have to wait for ‘fragment.count’ signal before releasing the original flow file to the success relationship. Until then, the flow file is transferred to the ‘wait’ relationship which is looping back on the Wait processor. Also, I configured a 10 minutes timeout in case I didn’t received the expected signals, and in this case the flow file would be routed to ‘expired’ relationship.

After my Wait processor, in case of error, I’m using an UpdateAttribute to retain this error:

Screen Shot 2018-06-26 at 2.08.50 PM.png

And after that, I’m using the PutSQL processor to update the ZIP monitoring table by executing the below query:

Screen Shot 2018-06-26 at 2.09.46 PM.png

The ‘wait.*’ attributes are generated by the Wait processor when releasing the flow file and contain the number of signals sent by the Notify processor (I’ll come back to that when describing the corresponding Notify processor). In this case, it allows me to know how many files contained in the ZIP file have been successfully processed and how many have not been successfully processed.

After the UnpackContent and the success relationship, I’m using a PutSQL to insert lines in my CSV monitoring table by executing the below query:

Screen Shot 2018-06-26 at 2.19.41 PM

The ‘segment.original.filename’ is the name of the ZIP file from which the CSV files have been extracted.

Let’s move to the final part of the workflow (it’s a bit loaded because I wanted to have everything in my screenshot):

Screen Shot 2018-06-26 at 2.27.15 PM.png

The MergeRecord is used to merge the data together while converting from CSV to Avro:

Screen Shot 2018-06-26 at 2.28.11 PM

In a real world use case, you would configure the processor to merge much more data together… but for the below demonstration I’m merging few flow files together so that I can quickly see the results.

Also I’m generating dummy data like:

Screen Shot 2018-06-26 at 2.29.41 PM.png

And the corresponding schema I’m using in the Avro Schema Registry controller service is:

Screen Shot 2018-06-26 at 2.30.55 PM.png

After the MergeRecord processor, there is the ‘original’ relationship where are routed the flow files used during the merge operation, and there is the ‘merged’ relationship where are routed the flow files containing the merged data. As explained above, the original flow files are containing a ‘merge.uuid’ attribute with the UUID of the merged flow file. That’s the attribute I’m using in the Wait processor configuration:

Screen Shot 2018-06-26 at 2.35.49 PM

Basically, I’m creating a signal identifier in the distributed cache with the ‘merge.uuid’ processor and as long as the signal counter is greater than 1, I’m releasing one flow file at a time. You’ll see in my Notify configuration (coming back to that in a bit) that, once my merged data is processed, I’m using the Notify processor to set the signal counter associated to the identifier with a value equal to the number of merged flow files. This way, once there is the notification, all the corresponding flow files will be released one by one.

I won’t describe the PutFile processor which is used to store my data (it could be a completely different processor), but just notice the UpdatAttribute I’m using in case of failure:

Screen Shot 2018-06-26 at 2.40.47 PM.png

And the UpdateAttribute I’m using in case of success:

Screen Shot 2018-06-26 at 2.41.24 PM

Note that I’m setting attributes with the same prefix ‘wn_ingestion.*’. Then, look at my Notify processor:

Screen Shot 2018-06-26 at 3.13.05 PM.png

The ‘uuid’ attribute is equal to the ‘merge.uuid’ I mentioned above for the original flow files used in the merge operation. Besides, I’m using the Attribute Cache Regew property to copy the corresponding attributes to all the flow files going to be released thanks to this signal. It means that when a merged flow file is sent to the Notify processor, all the original flow files with the corresponding in the ‘merge.uuid’ in the ‘wait’ relationship will be released and will get the ‘wn*’ attributes of the merged flow file. That’s how I’m “forwarding” the information about when the merged flow file has been sent to the destination, if the merged flow file has been successfully processed or not, and what is the filename used to save the file in the destination.

Once the original flow files are released from the Wait processor, they are sent to the Notify processor with the below configuration:

Screen Shot 2018-06-26 at 3.18.24 PM.png

In this case, I’m using the attribute inherited from the merged flow file sent to the destination to determine if the processing has been successful or not. If yes, then I’m setting the counter name to ‘OK’, otherwise it’s set to ‘KO’. That’s how I’m getting the information about how many CSV files from a given ZIP have been successfully processed or not. (remember the ‘wait.counter.OK’ and ‘wait.counter.KO’ attributes created when the flow file corresponding to the ZIP file is released?).

And now… I can execute my SQL query to update my CSV monitoring table with the latest information:

Screen Shot 2018-06-26 at 3.22.09 PM

Demonstration

Let’s see how the workflow is working and what’s the result in all kind of situations. I’m running a Postgres instance in a Docker container to store my monitoring data. Here are the statements I used to create the two tables.

CREATE TABLE IF NOT EXISTS zip_files_processing (
 zip_file_name varchar(255),
 ff_uuid varchar(255),
 ingestion_date timestamp DEFAULT current_timestamp,
 storage_date timestamp,
 number_files integer,
 number_files_OK integer,
 number_files_KO integer,
 status varchar(15)
);
CREATE TABLE IF NOT EXISTS files_ingestion (
 zip_file_name varchar(255),
 file_name varchar(255),
 ff_uuid varchar(255),
 storage_date timestamp,
 storage_name varchar(255),
 status varchar(15)
);

Case 1 – nominal situation

In this case, all the received data is OK and all is working flawlessly… Let’s say I’m receiving two ZIP files. Once the data is received, I can see something like:

Screen Shot 2018-06-15 at 11.53.58 PM

 

Then the ZIP files are unpacked, and I can see my CSV files in the other table:

Screen Shot 2018-06-15 at 11.54.42 PM

Then the data is merged and sent to the destination. At the end, my tables looks like:

Screen Shot 2018-06-15 at 11.56.28 PM.png

I can see that the data coming from the two ZIP files has been merged into the same final file and everything is OK. In the ZIP monitoring table, I have:

Screen Shot 2018-06-15 at 11.57.55 PM

Case 2 – cannot fetch ZIP file

In this case, the FetchFile processor is failing due to some permission issues. Here is what I get in my ZIP monitoring table:

Screen Shot 2018-06-16 at 12.04.35 AM

Note that I could a ‘comment’ column with the error cause.

Case 3 – cannot unpack ZIP file

Result will be identical to case 2 as I’m handling the situation in the same way.

Case 4 – cannot merge records because a CSV file is not schema valid

I’m processing 2 ZIP files but a CSV file contained in one of the ZIP file cannot be merged with the others because it does not respect the schema. Note that all the flow files used during a merge operation will be sent to the failure relationship if one of the flow file is not valid. That’s why multiple flow files will be routed to the failure relationship even though there is only one bad file. To avoid this situation, it would be possible to add an additional step by using the ValidateRecord processor.

For my ZIP files, I have:

Screen Shot 2018-06-22 at 8.42.00 PM

At the end, for this run, I have:

 

Screen Shot 2018-06-22 at 8.44.01 PM.png

Case 5 – cannot send a merged file to destination

In this case I’m simulating an error when sending a merged flow file to the destination. Here is the result at the end for the ZIP table:

Screen Shot 2018-06-22 at 9.57.09 PM

And the CSV files table:

Screen Shot 2018-06-22 at 9.57.44 PM

Conclusion

I’ve covered a non exhaustive list of situations and the workflow could be improved but it gives you a good idea on how the Wait/Notify pattern can be used in a NiFi workflow to help monitoring what’s going on when dealing with split / merge situations. In addition to that, it’d easy to add an automatic mechanism to try replaying failed flow files using the UUID and the provenance repository replay feature of NiFi.

Thanks for reading this post and, as always, feel free to comment and/or ask questions!

Automate workflow deployment in Apache NiFi with the NiFi Registry

Apache NiFi 1.6 (release note) is now out and one of the great new features is the addition of a Command Line Interface in the NiFi Toolkit binary that allows you to interact with NiFi instances and NiFi Registry instances.

In this post, I’ll discuss about the ways you have to automate deployment and promotion of workflows between multiple environments using the NiFi Registry. I’ll discuss and demonstrate two options you have that you need to know about:

  • the NiFi CLI released with the NiFi Toolkit 1.6.0 (download) (github)
  • The NiPyAPI python library 0.8.0 (github)

Before going into the details, let’s discuss about the different scenarios we have… First, how many NiFi Registry instances do you have? According to your security requirements, it might not be allowed to have a single NiFi Registry available from all your environments. In that case, you could be in a situation where you need to have a NiFi Registry instance running for each of your NiFi environments. And, in such a case, you’d need to export/import the workflows information between the NiFi Registry instances to take benefit of all the great features you have with the Registry.

  • Scenario 1 – one Registry to rule them all

Screen Shot 2018-03-27 at 4.57.03 PM

  • Scenario 2 – one Registry per environment

Screen Shot 2018-03-27 at 5.02.27 PM

Then you have two kind of situations to handle:

  • Deployment of a completely new workflow from one environment to another
  • Update to a newer version of an existing workflow in production

I’ll go into the details of each of the 4 cases using each one of the two clients available for you to use. It’s up to you to decide what client you prefer to use. For each one of the cases I’ll only do one “hop” from Dev to Prod, but I’d expect you to have at least one additional hop to actually test your workflow in an integration environment.

Note 1 – I won’t use a secured environment in the following demos, but both of the tools are supporting secured deployments so that you can authenticate using certificates to perform the actions as a given user having the appropriate authorizations.

Note 2 – For this post, I choose to have the following workflow:

For the first version of the workflow I will have:

ListFile => FetchFile => PutHDFS

Then I’ll add a connection from FetchFile to also push the data into Elasticsearch by adding a PutElasticSearchHttp processor. The idea is also to show you how to handle variables that need to be changed according to the environment.

Before starting anything… How to configure your NiFi instance/cluster to exchange with the NiFi Registry? To do that, you need to go in the top right menu, to go in Controller Settings, and in the Registry Clients tab:

NiFi Menu

Configure Registry Client

List of registry clients

In scenario 1, both the NiFi dev and the NiFi prod are configured to communicate with the same NiFi Registry instance. In scenario 2, each environment is configured with its own NiFi Registry instance.

Important – even if case 1 is not the case you are interested in, that’s where I’m describing the initial workflow and what are the differences between versions 1 and 2. Please have a look as I won’t go into as much details when describing the other cases.


  • Case 1 – One NiFi Registry using the NiFi CLI

Environment details:
NiFi Devhttp://nifi-dev:8080/nifi
NiFi Prodhttp://nifi-prod:8080/nifi
NiFi Registryhttp://nifi-registry:18080/nifi-registry

First of all, I create a process group that will contain my workflow. I call this process group “My Project”.

Process Group

Then, I design my workflow:

Screen Shot 2018-03-30 at 1.10.57 PM

I have externalized some properties because I know the value will change depending on which environment my workflow is running. To use process group variables in a property, you first need to check that Expression Language is supported. Besides, you cannot use variables for sensitive properties (I’ll come back to this point at the end of this article). To check if expression language is support on a property, you can hover the question mark next to it:

Screen Shot 2018-03-30 at 1.12.58 PM

To add variables for a process group, you need to right click in the canvas (when inside the process group) and click on “variables”, or directly right click on the process group and click on “variables”:

Screen Shot 2018-03-30 at 1.14.57 PM

Here I define my variables that I will use in the configuration of the processors belonging to my workflow:

Screen Shot 2018-03-30 at 1.15.57 PM.png

This view allows you to define key/value pairs that can be referenced in components using the expression language. You can also view the list of the components referencing the variables you create.

Here is the configuration of my ListFile processor:

Screen Shot 2018-03-30 at 1.18.14 PM

Here is the configuration of my FetchFile processor:

Screen Shot 2018-03-30 at 1.18.22 PM.png

Here is the configuration of my PutHDFS processor:

Screen Shot 2018-03-30 at 1.18.32 PM.png

As you can see I’m using external variables for:

– the input directory used in ListFile
– the list of configuration files used in PutHDFS
– the Kerberos principal used in PutHDFS
– the Kerberos keytab used in PutHDFS
– the output directory used in PutHDFS

Next step is to create a bucket for my project in the NiFi Registry. To do that, go to the NiFi Registry home page, and go in Settings (top right):

Screen Shot 2018-03-30 at 1.32.10 PM.png

I create a bucket for the project I’m working on:

Screen Shot 2018-03-30 at 1.33.00 PM.png

A bucket is a logical place to store versioned items/resources and that’s on buckets that permissions/authorizations are assigned to users/groups. Currently the only resource type to store in buckets are versioned flows (or workflows). Each versioned flow has a name, a description, and 1 or more “snapshots” (versions). Each snapshot (or version) has metadata and content: the metadata contains a version number, a commit message, an author, and a commit date ; the content contains the representation of the workflow itself when it has been committed.

Once my bucket is created, I can start versioning my workflow in the registry. To do that, right click on the process group and start version control:

Screen Shot 2018-03-30 at 1.40.39 PM.png

Choose the bucket in which you want to register you workflow:

Screen Shot 2018-03-30 at 1.42.21 PM

Once your workflow is version controlled in the Registry, you should be able to see it in the Registry:

Screen Shot 2018-03-30 at 1.43.45 PM.png

Also, in NiFi, you can see that your process group is now versioned and up-to-date with the Registry (green icon in the top left corner of the process group):

Screen Shot 2018-03-30 at 2.02.20 PM.png

We are now ready to deploy this workflow in production! Here is what we’re going to do:

– import the versioned workflow in production
– update the variables to set the values for the production environment
– start the process group

For the demo I’ll use it with the interactive shell, but you can script the commands (have a look here).

$ ./cli.sh
 _ ___ _
 Apache (_) .' ..](_) ,
 _ .--. __ _| |_ __ )\
[ `.-. | [ |'-| |-'[ | / \
| | | | | | | | | | ' '
[___||__][___][___] [___]', ,'
 `'
 CLI v1.6.0

Type 'help' to see a list of available commands, use tab to auto-complete.

Session loaded from /Users/pvillard/.nifi-cli.config

#>

Let’s list the buckets in my Registry:

#> registry list-buckets -u http://nifi-registry:18080

# Name      Id                                   Description
- --------- ------------------------------------ -----------
1 MyProject 2a5566b5-7380-46b8-9598-328e092e8899 (empty)

Let’s list the workflows in the bucket referenced by the ID 1 (I’m using back reference, but you could use the full identifier for fully scripted solutions):

#> registry list-flows -b &1 -u http://nifi-registry:18080

Using a positional back-reference for 'MyProject'

# Name       Id                                   Description
- ---------- ------------------------------------ ----------------------------------------
1 MyWorkflow 9c2874ca-673a-4553-bbb1-5b370ff23b70 This a workflow to demonstrate workfl...

I now want to deploy the last version (-fv 1) of this workflow in my production NiFi:

#> nifi pg-import -b &1 -f &1 -fv 1 -u http://nifi-prod:8080

Using a positional back-reference for 'MyProject'

Using a positional back-reference for 'MyWorkflow'

76f015f8-0162-1000-5470-a7c6511e5685

My process group is now imported in the production NiFi with everything stopped since it’s the first time I deploy it. If I didn’t want to deploy it at the root level of my production NiFi I could have specified a parent process group ID in the previous command.

Let’s now list the variables of this process group:

#> nifi pg-get-vars -pgid 76f015f8-0162-1000-5470-a7c6511e5685 -u http://nifi-prod:8080

# Name                    Value
- ----------------------- ----------------------------------------
1 HDFSconfFiles           /Users/pvillard/Documents/nifi-workdir/fieldcloud/core-site.xml,/Users/pvillard/Documents/nifi-workdir/fieldcloud/hdfs-site.xml
2 HDFSdirectory           /dev/dest/directory
3 HDFSkeytab              myproject-dev.keytab
4 HDFSprincipal           myproject-dev@EXAMPLE.COM
5 ListFilesInputDirectory /dev/tmp

I can now update the variables with the values of my production environment:

#> nifi pg-set-var -pgid 76f015f8-0162-1000-5470-a7c6511e5685 -var HDFSconfFiles -val /etc/hadoop/conf/current/hdfs-site.xml,/etc/hadoop/conf/current/core-site.xml -u http://nifi-prod:8080
#> nifi pg-set-var -pgid 76f015f8-0162-1000-5470-a7c6511e5685 -var HDFSdirectory -val /prod/dest/directory -u http://nifi-prod:8080
#> nifi pg-set-var -pgid 76f015f8-0162-1000-5470-a7c6511e5685 -var HDFSkeytab -val myproject-prod.keytab -u http://nifi-prod:8080
#> nifi pg-set-var -pgid 76f015f8-0162-1000-5470-a7c6511e5685 -var HDFSprincipal -val myproject-prod@EXAMPLE.COM -u http://nifi-prod:8080
#> nifi pg-set-var -pgid 76f015f8-0162-1000-5470-a7c6511e5685 -var ListFilesInputDirectory -val /prod/tmp -u http://nifi-prod:8080

I can confirm in the the UI of my production NiFi that variables have been updated:

Screen Shot 2018-03-30 at 3.11.44 PM.png

I just need to start my process group and we’re done:

#> nifi pg-start -pgid 76f015f8-0162-1000-5470-a7c6511e5685 -u http://nifi-prod:8080

We’ve successfully deployed a new workflow in production. Let’s now update our workflow in the development NiFi and create a new version of it:

Screen Shot 2018-03-30 at 3.38.55 PM.png

I’ve added a PutElasticsearchHttp processor to also send my data into Elasticsearch. Here is the configuration of my processor:

Screen Shot 2018-03-30 at 3.08.41 PM.png

And I’ve added a new variable for the URL of my Elasticsearch instance:

Screen Shot 2018-03-30 at 3.10.50 PM.png

I can see on my process group, that I now have local changes to commit in the NiFi Registry:

Screen Shot 2018-03-30 at 3.14.41 PM.png

To commit the changes, right click, version, commit local changes:

Screen Shot 2018-03-30 at 3.15.27 PM.png

You can comment your changes before committing the changes in the NiFi Registry:

Screen Shot 2018-03-30 at 3.16.32 PM.png

My processor is now up-to-date and I can see the new version in the Registry:

Screen Shot 2018-03-30 at 3.17.33 PM.png

In production I can see that my process group is not up-to-date anymore and a new version is available:

Screen Shot 2018-03-30 at 3.18.52 PM.png

To update the production environment to the latest version, you just need to do the following with the NiFi CLI:

#> nifi pg-change-version -pgid 76f015f8-0162-1000-5470-a7c6511e5685 -u http://nifi-prod:8080

Not only the flow is updated, but there is no downtime: running processors remain running, previously existing variables remain unchanged. You just have to update the new variable and start the process group to start the stopped PutElasticsearchHttp processor:

#> nifi pg-get-vars -pgid 76f015f8-0162-1000-5470-a7c6511e5685 -u http://nifi-prod:8080

# Name                    Value
- ----------------------- ----------------------------------------
1 HDFSconfFiles           /etc/hadoop/conf/current/hdfs-site.xml,/etc/hadoop/conf/current/core-site.xml
2 HDFSdirectory           /prod/dest/directory
3 HDFSkeytab              myproject-prod.keytab
4 HDFSprincipal           myproject-prod@EXAMPLE.COM
5 ListFilesInputDirectory /prod/tmp
6 ElasticsearchURL        http://es-dev:9200

#> nifi pg-set-var -pgid 76f015f8-0162-1000-5470-a7c6511e5685 -var ElasticsearchURL -val http://es-prod:9200 -u http://nifi-prod:8080
#> nifi pg-start -pgid 76f015f8-0162-1000-5470-a7c6511e5685 -u http://nifi-prod:8080

You now have the latest version of your workflow in production and you didn’t experience any downtime since you didn’t modify your source processor at all.

Screen Shot 2018-03-30 at 4.15.09 PM.png


  • Case 2 – One NiFi Registry using NiPyAPI

Environment details:
NiFi Devhttp://nifi-dev:8080/nifi
NiFi Prodhttp://nifi-prod:8080/nifi
NiFi Registryhttp://nifi-registry:18080/nifi-registry

(it is recommended to read the case 1 as it gives an overview of the overall story)

The first version of my workflow has been committed in the NiFi Registry, I want to deploy it in my production NiFi for the first time.

I assume that you already have a Python environment where you can use the nipyapi library (please refer to the documentation if needed).

>>> help(nipyapi)
...
VERSION
 0.8.0

AUTHOR
 Daniel Chaffelson
...

>>> devNiFi = 'http://nifi-dev:8080/nifi-api'
>>> registry = 'http://nifi-registry:18080/nifi-registry-api'
>>> prodNiFi = 'http://nifi-prod:8080/nifi-api'

Right now there is no method available to deploy for the first time a versioned workflow as a new process group. This will be added in the next version of the library. In the meantime, you can use the below option:

def deploy_flow_version(parent_pg, location, bucketId, flowId, registryID, ver):
  assert isinstance(parent_pg, nipyapi.nifi.ProcessGroupEntity)
  assert isinstance(location, tuple)
  try:
    return nipyapi.nifi.ProcessgroupsApi().create_process_group(
      id=parent_pg.id,
      body=nipyapi.nifi.ProcessGroupEntity(
        revision=parent_pg.revision,
        component=nipyapi.nifi.ProcessGroupDTO(
          position=nipyapi.nifi.PositionDTO(
            x=float(location[0]),
            y=float(location[1])
          ),
          version_control_information=nipyapi.nifi.VersionControlInformationDTO(
            bucket_id=bucketId,
            flow_id=flowId,
            registry_id=registryID,
            version=ver
          )
        )
      )
    ).id
  except nipyapi.nifi.rest.ApiException as e:
    raise e

I can now create a process group with my versioned workflow:

>>> nipyapi.utils.set_endpoint(prodNiFi)
True
>>> nipyapi.utils.set_endpoint(registry)
True
>>> bucketName = "MyProject"
>>> workflowName = "MyWorkflow"
>>> registryName = "Registry"
>>>
>>> rootPgId = nipyapi.canvas.get_root_pg_id()
>>> rootPg = nipyapi.canvas.get_process_group(rootPgId, identifier_type='id')
>>> bucketID = nipyapi.versioning.get_registry_bucket(bucketName).identifier
>>> workflowID = nipyapi.versioning.get_flow_in_bucket(bucketID, workflowName).identifier
>>> registryID = nipyapi.versioning.get_registry_client("Registry").id
>>> ver = 1
>>> location = (200, 200)
>>>
>>> deploy_flow_version(rootPg, location, bucketID, workflowID, registryID, ver)
'01621009-0a4c-1704-e137-e564eadb11e2'

With the above list of commands we have retrieved the bucket ID, the workflow ID, the ID of the registry client in the production NiFi, and we deployed the versioned flow at the root level (using the root process group) at a given location.

The last command returns the ID of the newly created process group that contain our versioned workflow.

We can now update the variables of the process group:

versionedPg = nipyapi.canvas.get_process_group('01621009-0a4c-1704-e137-e564eadb11e2', identifier_type='id')
nipyapi.canvas.update_variable_registry(versionedPg, [('HDFSprincipal', 'myproject-prod@EXAMPLE.COM'), ('HDFSkeytab', 'myproject-prod.keytab'), ...]

And we now just have to start the process group:

>>> nipyapi.canvas.schedule_process_group('01621009-0a4c-1704-e137-e564eadb11e2', True)
True

Let’s now imagine that we have a new version available for our workflow and we want to update it to the latest version. Then we just need to do the following:

>>> versionedPg = nipyapi.canvas.get_process_group('01621009-0a4c-1704-e137-e564eadb11e2', identifier_type='id')
>>> nipyapi.versioning.update_flow_ver(versionedPg)

We can now update the variables if needed, and start the newly added processors as we did at the end of case 1 description.


  • Case 3 – Two NiFi Registries using the NiFi CLI

Environment details:
NiFi Devhttp://nifi-dev:8080/nifi
NiFi Prodhttp://nifi-prod:8080/nifi
NiFi Registry Devhttp://registry-dev:18080/nifi-registry
NiFi Registry Prodhttp://registry-prod:18080/nifi-registry

(it is recommended to read the case 1 as it gives an overview of the overall story)

In this case, security requirements prevent us to have a single NiFi Registry reachable from all our NiFi environments. Consequently, we have one Registry instance per environment and we have to setup a mechanism to move versioned workflow from one Registry instance to another. I will go very quickly on the basic functionalities (that I described in previous cases) of the CLI and focus on the export/import between two registries.

I assume that each NiFi environment has been configured to register its Registry in the controller settings. If not, refer to the beginning of the article.

Starting point: the initial version of the workflow is committed in the Dev Registry and should be deployed in production.

I first list the buckets in my Dev Registry, list the flows in the bucket I want, and export the flow I’m looking for at the given version into a JSON file:

#> registry list-buckets -u http://registry-dev:18080

# Name      Id                                   Description
- --------- ------------------------------------ -----------
1 MyProject 2a5566b5-7380-46b8-9598-328e092e8899 (empty)

#> registry list-flows -b &1 -u http://registry-dev:18080

Using a positional back-reference for 'MyProject'

# Name       Id                                   Description
- ---------- ------------------------------------ -----------
1 MyWorkflow 3403c78c-7074-45c3-bc3b-aeac75970e85

#> registry export-flow-version -f &1 -fv 1 -o /tmp/my-flow.json -ot json -u http://registry-dev:18080

Using a positional back-reference for 'MyWorkflow'

Since this is first time I deploy this workflow in production, it’s possible I don’t have a bucket for it yet in the Prod Registry. Let’s create one, and let’s create the workflow resource in the bucket so that we can do the import:

#> registry create-bucket -bn "MyProjectProd" -u http://registry-prod:18080

33aacd91-ca5d-4405-a657-05de37da1fb7

#> registry list-buckets -u http://registry-prod:18080

# Name          Id                                   Description
- ------------- ------------------------------------ -----------
1 MyProjectProd 33aacd91-ca5d-4405-a657-05de37da1fb7 (empty)

#> registry create-flow -b &1 -fn "MyWorkflowProd" -u http://registry-prod:18080

Using a positional back-reference for 'MyProjectProd'

67576995-fb0f-4324-987a-dfcf186a24c8

We can now import the versioned workflow that we exported as JSON file from the Dev Registry into the Prod Registry:

#> registry list-buckets -u http://registry-prod:18080

# Name          Id                                   Description
- ------------- ------------------------------------ -----------
1 MyProjectProd 33aacd91-ca5d-4405-a657-05de37da1fb7 (empty)

#> registry list-flows -b &1 -u http://registry-prod:18080

Using a positional back-reference for 'MyProjectProd'

# Name           Id                                   Description
- -------------- ------------------------------------ -----------
1 MyWorkflowProd 67576995-fb0f-4324-987a-dfcf186a24c8 (empty)

#> registry import-flow-version -f &1 -i /tmp/my-flow.json -u http://registry-prod:18080

Using a positional back-reference for 'MyWorkflowProd'

1

Now the versioned workflow is available in the Prod Registry, and we can do exactly the same as we did in the case 1 to deploy it in the production NiFi.

Assuming a new version of the workflow is available in the Dev Registry, you just need to export the version of your choice (using the option -fv) and import it in the existing bucket/workflow in the Prod Registry. The version on the Prod Registry side will be automatically incremented. One benefit of this approach/separation with two registries is that you can move from one registry to another *only* the versions that are considered ready enough to be promoted in the next environment. The full lifecycle workflow could be represented like this:

Screen Shot 2018-03-30 at 6.50.15 PM.png

Using explanations in both case 1 and 3, you should be able to manage the complete life cycle deployment of your workflows using the CLI.


  • Case 4 – Two NiFi Registries using the NiPyAPI

Environment details:
NiFi Devhttp://nifi-dev:8080/nifi
NiFi Prodhttp://nifi-prod:8080/nifi
NiFi Registry Devhttp://registry-dev:18080/nifi-registry
NiFi Registry Prodhttp://registry-prod:18080/nifi-registry

(it is recommended to read the case 1 as it gives an overview of the overall story)

In this case, security requirements prevent us to have a single NiFi Registry reachable from all our NiFi environments. Consequently, we have one Registry instance per environment and we have to setup a mechanism to move versioned workflow from one Registry instance to another. I will go very quickly on the basic functionalities (that I described in previous cases) of NiPyAPI and focus on the export/import between two registries.

I assume that each NiFi environment has been configured to register its Registry in the controller settings. If not, refer to the beginning of the article.

Starting point: the initial version of the workflow is committed in the Dev Registry and should be deployed in production.

I export the flow I’m looking for, at the given version (in this case, version 1), into a JSON file:

>>> devNiFi = 'http://nifi-dev:8080/nifi-api'
>>> devRegistry = 'http://registry-dev:18080/nifi-registry-api'
>>> prodNiFi = 'http://nifi-prod:8080/nifi-api'
>>> prodRegistry = 'http://registry-prod:18080/nifi-registry-api'
>>> bucketName = "MyProject"
>>> workflowName = "MyWorkflow"

>>> nipyapi.utils.set_endpoint(devRegistry)
>>> bucketID = nipyapi.versioning.get_registry_bucket(bucketName).identifier
>>> workflowID = nipyapi.versioning.get_flow_in_bucket(bucketID, workflowName).identifier
>>> nipyapi.versioning.export_flow_version(bucketID, workflowID, version='1', file_path='/tmp/my-flow.json', mode='json'

Since this is first time I deploy this workflow in production, it’s possible I don’t have a bucket for it yet in the Prod Registry. Let’s create one so that we can do the import:

>>> nipyapi.utils.set_endpoint(prodRegistry)
True
>>> nipyapi.utils.set_endpoint(prodNiFi)
True
>>> bucketNameProd = "MyProjectProd"
>>> nipyapi.versioning.create_registry_bucket(bucketNameProd)
{'created_timestamp': 1522430434276,
 'description': None,
 'identifier': '8a9e46b0-722e-40f6-9401-98103de56435',
 'link': {'params': {'rel': 'self'},
 'rel': None,
 'rels': None,
 'title': None,
 'type': None,
 'uri': None,
 'uri_builder': None},
 'name': 'MyProjectProd',
 'permissions': {'can_delete': True, 'can_read': True, 'can_write': True}}
>>> bucketProdID = nipyapi.versioning.get_registry_bucket(bucketNameProd).identifier

I can now import my versioned workflow in this bucket:

>>> workflowNameProd = "MyWorkflowProd"
>>> nipyapi.versioning.import_flow_version(bucketProdID, encoded_flow=None, file_path='/tmp/my-flow.json', flow_name=workflowNameProd, flow_id=None

Now the versioned workflow is available in the Prod Registry, and we can do exactly the same as we did in the case 2 to deploy it in the production NiFi.

Let’s assume you do modifications in the Dev NiFi and you want to deploy the new version of the workflow in the existing bucket of the Prod Registry, then you would do (assuming you already the export, as before, in /tmp/my-flow.json):

>>> workflowProdID = nipyapi.versioning.get_flow_in_bucket(bucketProdID, workflowNameProd).identifier
>>> nipyapi.versioning.import_flow_version(bucketProdID, encoded_flow=None, file_path='/tmp/my-flow.json', flow_name=None, flow_id=workflowProdID

And you can now update the process group of the production NiFi to the latest version using the commands already described in case 2.


  • Using the CLI in a non-interactive mode

Refer to the CLI documentation for more details, but you’d probably use the CLI in the non interactive mode in your deployments.

I create 4 configuration files:

nifi-dev.properties

baseUrl=http://nifi-dev:8080
keystore=
keystoreType=
keystorePasswd=
keyPasswd=
truststore=
truststoreType=
truststorePasswd=
proxiedEntity=

nifi-prod.properties

baseUrl=http://nifi-prod:8080
keystore=
keystoreType=
keystorePasswd=
keyPasswd=
truststore=
truststoreType=
truststorePasswd=
proxiedEntity=

registry-dev.properties

baseUrl=http://registry-dev:8080
keystore=
keystoreType=
keystorePasswd=
keyPasswd=
truststore=
truststoreType=
truststorePasswd=
proxiedEntity=

registry-prod.properties

baseUrl=http://registry-prod:8080
keystore=
keystoreType=
keystorePasswd=
keyPasswd=
truststore=
truststoreType=
truststorePasswd=
proxiedEntity=

I can now use the following commands to automatically export/import a given bucket, workflow, version from the Dev Registry to the Prod Registry. In order to ease the process, I’m specifying “json” as the output type, and I’m using the jq command to parse the results.

#!/bin/sh

set -e

# Set the variables
BUCKET="MyProject"
WORKFLOW="MyWorkflow"
BUCKETPROD="MyProjectProd"
WORKFLOWPROD="MyWorkflowProd"
VERSION=1
FILE="/tmp/my-flow.json"

set_endpoints() {
 ./cli.sh session set nifi.props $1
 ./cli.sh session set nifi.reg.props $2
}

get_bucketid() {
 result=$(./cli.sh registry list-buckets -ot json | jq '.[] | select(.name=="'$1'") | .identifier')
 if [ -z "$result" ]; then
   >&2 echo "No bucket with name $1"
   return 1
 else
   echo $result
 fi
}

get_workflowid() {
 result=$(./cli.sh registry list-flows -b $1 -ot json | jq '.[] | select(.name=="'$2'") | .identifier')
 if [ -z "$result" ]; then
   >&2 echo "No workflow with name $2 in bucket $1"
   return 1
 else
   echo $result
 fi
}

# Set the endpoints
set_endpoints "nifi-dev.properties" "registry-dev.properties"

# Export the workflow
BUCKETID=$(get_bucketid $BUCKET)
WORKFLOWID=$(get_workflowid $BUCKETID $WORKFLOW)
./cli.sh registry export-flow-version -f $WORKFLOWID -fv $VERSION -o $FILE -ot json

# Change the endpoints
set_endpoints "nifi-prod.properties" "registry-prod.properties"

# Import the workflow
BUCKETID=$(get_bucketid $BUCKETPROD)
WORKFLOWID=$(get_workflowid $BUCKETID $WORKFLOWPROD)
./cli.sh registry import-flow-version -f $WORKFLOWID -i $FILE

I’m sure you are all set to script all the operations you need.

Also, be aware that there is a command

registry transfer-flow-version

that you can use to do the export/import I described above:

#> registry transfer-flow-version help

Transfers a version of a flow directly from one Registry to another,
without needing to export/import. If --sourceProps is not specified,
the source flow is assumed to be in the same registry as the
destination flow. If --sourceFlowVersion is not specified, then the
latest version will be transferred

Also, instead of using the ‘session’ command to set the endpoint, you can directly use the properties file in the command using the ‘-p’ option:

./bin/cli.sh registry list-buckets -p /path/to/local-nifi-registry.properties

Using the ‘session’ command will change the default settings of the interactive shell for the next time you’re using it.


  • What is next? Are there some limitations?

There are already few JIRAs opened to improve the CLI, and Dan Chaffelson is very active and keen to improve his library. So it will definitely improve over time on both sides and, as I said, in the future, the Registry will also store much more than just workflows.

If you want to see new features in the CLI, feel free to open a JIRA here or send an email on the users mailing list of the Apache NiFi project. If you want to see improvements in the Python library, you can quickly and easily interact on the Github repository.

Few limitations to keep in mind right now:

  1. Support for embedded versioned process groups is not fully ready yet when you have multiple Registry instances and you need to export/import things between the instances. You can check the status on NIFI-5029.
  2. When versioning a process group and deploying it in NiFi, all the sensitive properties are emtpied for security reasons. Just like the variables, you would need to set the values using the REST API or NiPyAPI (no support in the CLI yet) when you deploy the workflow for the first time (or when adding new sensitive properties between two versions). Regarding support in the CLI, you can check the status on NIFI-5028.
  3. The example I gave is not using any controller service defined at process group level. If you’re using controller services, the CSs will be stopped when deploying the versioned workflow for the first time. Using pg-start won’t start the controller services, they need to be started before you can actually start the process group. There is no support for this in the CLI yet, but you can use the REST API of NiPyAPI. Regarding support in the CLI, you can check the status on NIFI-5027.
    >> Note: The improvement has been merged in master code of NiFi but didn’t make it in NiFi 1.6.0. You can, however, build the CLI from master code and use it with NiFi 1.6.0.

There is much more on the roadmap, so stay tuned!

I hope this overview is already going to help you a lot in automating Flow Development Life Cycle (FDLC). It should integrate nicely in your DevOps pipelines and make your life much easier. Keep in mind that a lot of developments are currently in progress and that this article could quickly be “out dated”. I’ll try to do my best to keep this one up-to-date or to post new articles in case of new major features.

As usual, thanks for reading me, and feel free to comment/ask questions.

Kafka SSL – Custom Principal Builder

I just pushed a repository on Github with code for a Custom Principal Builder when exposing Kafka brokers with SSL only.

The motivation behind this code is the following: some producers/consumers might not be able to use Kerberos to authenticate against Kafka brokers and, consequently, you can’t use SASL_PLAINTEXT or SASL_SSL. Since PLAINTEXT is not an option (for obvious security reasons), it remains SSL.

When configuring a broker to use SSL, you will have authentication AND encryption if and only if 2-ways SSL is configured (by setting ssl.client.auth=required). It is strongly recommended to always set this property to required. (For additional information: https://docs.confluent.io/current/kafka/authentication_ssl.html#ssl-overview).

When 2-ways SSL is enabled, the client will be authenticated using the Subject of the client certificate used to perform the handshake. It means that if the Subject is: CN=kafkaClient, OU=OrgUnit, O=My Company, you will have to set your topic ACLs to allow this user to consume/publish data.

When using Apache Ranger to manage authorizations for your Kafka brokers, it’s not great to have this kind of username… That’s why we want to define a custom principal builder to extract a username from the Subject of the certificate.

In the provided code, I want to expose two properties: a pattern that will be used to match the Subject and extract any capture group I want, and a value to construct the username using the capture groups. I added the two below properties in the server.properties configuration file on brokers side:

kafka.security.identity.mapping.pattern.dn=^.*[Cc][Nn]=([a-zA-Z0-9.]*).*$
kafka.security.identity.mapping.value.dn=$1

In this case, I only want to extract the CN part of the Subject and use it as the username of the client. If needed, I could use more complex patterns but with my previous example, my client would be authenticated with kafkaClient as username. It’s now easier to define the authorizations on my topic using built-in ACLs or using Apache Ranger.

Note: with Kafka 1.0+, the implementation changed a bit. Even though this code remains valid, there is a new interface that is much easier to implement (https://cwiki.apache.org/confluence/display/KAFKA/KIP-189%3A+Improve+principal+builder+interface+and+add+support+for+SASL) and it also provides the possibility to implement the principal builder when using SASL. For a Kafka 1.x version of this code, have a look on this branch.

FoD Paris Jan 18′ – NiFi Registry and workflow monitoring with a use case

I got the chance to talk a bit about NiFi during the last Future of Data meetup in Paris, and I wanted to share in a blog what I explained during this event.

First of all, I want to remind you that Apache NiFi 1.5.0, MiNiFi 0.4.0 and NiFi Registry 0.1.0 are out and it represents a huge step forward in the community as it brings flow development life cycle (FDLC) to a completely new level. Besides, this first version of the registry is a solid foundation for what will come in the future.

Ok let’s get back to what I discussed at the meetup.

  • Introduction to NiFi Registry

What is the NiFi Registry? It’s a new component of the NiFi ecosystem, it’s a web-based application that is meant to be used as a central location for storage and management of shared resources across one or more instances of NiFi and/or MiNiFi.

In a typical multi-clusters environment, this could be represented as below:

Screen Shot 2018-02-02 at 2.03.45 PM.png

At the moment, the integration between NiFi and the NiFi Registry allows you to store, retrieve, and upgrade versioned flows. Next versions will allow users to store more different types of resources. And if you’re asking… yes the NiFi Registry can be secured and provides authorization support to control users and access policies to the versioned flows.

Note that, even if you have a single environment, using the NiFi Registry will also make a lot of sense. Firstly, it’s a convenient way to save your work and to version it (and to be honest, this argument alone should convince you), but it’s also a practical way to share common reusable parts of flows between multiple workflows.

For example, let’s say you have a process group designed to perform a very common task like transforming XML data to JSON and adding attributes to the flow files. If you have to update this process groups with few changes and if this process group is used in multiple places, you don’t need to copy/paste your changes everywhere. You just have to version it and as soon as you update it, you’ll be able to update all the other instances of this process group to the latest version.

  • NiFi monitoring NiFi

I already posted some blogs around NiFi monitoring but that’s a recurrent subject and we have quite a lot of options to answer monitoring requirements someone might have.

What do we mean by “monitoring”? In my opinion, we have two main subjects:

  1. Dashboarding: expose various metrics data to measure the application performance and stability.
  2. Alerting: send notifications when something is wrong.

In the context of NiFi, we can also have two types of monitoring:

  1. “Technical” monitoring: is the NiFi service working as expected? are the nodes up and alive?
  2. “Business” monitoring: is a particular workflow for a given project behaving as expected?

For both types of monitoring, my recommendation is to use the reporting tasks available in NiFi. Reporting tasks provide an efficient way to extract and process the data generated by the framework for technical related metrics as well as workflow related metrics.

  • Use case study

I’m going to articulate this blog around a very simple use case that will demonstrate the use of the NiFi Registry to perform FDLC tasks. And I’ll also use it as a baseline to show various ways of monitoring both the NiFi service and a particular workflow.

The use case is to expose an HTTP REST end point using NiFi so that an application can send data through HTTP calls to NiFi. The idea is to route the received data based on the URI used in the HTTP call. For each type of data, I’ll perform XML validation against a schema, XML to JSON conversion, perform simple data transformations and send the data into a Kafka topic.

  • NiFi monitoring: survival kit

Before going into the details of the use case, I’ll give my personal opinion of what should be the minimum setup for efficient NiFi monitoring. Here are the 6 reporting tasks that should be configured:

Screen Shot 2018-02-02 at 3.15.03 PM.png

# The Ambari reporting task is used to send metrics to the Ambari Metrics Service (obviously, this reporting task only makes sense if you have Ambari and AMS available, and, if not, there are similar reporting tasks for different systems). When using the reporting task, the following metrics will be available in Ambari on the NiFi service page:

Screen Shot 2018-02-02 at 3.51.48 PM.png

Things to monitor carefully are:

  1. JVM heap usage to avoid OutOfMemory errors. Even though NiFi is not requiring a lot of RAM, some processors/use cases might need more RAM. It also depends how much flow files NiFi is going to handle at one time.
  2. Active threads is the mean number of active threads per node in the NiFi cluster. By default, each NiFi node will initialize a pool of 10 threads for timer driven components. If you see that this metric is always equal to the size of the thread pool, then you might need to increase the thread pool size. But this number needs to be changed with care in coordination with CPU/load metrics of your NiFi nodes. To change the thread pool size, go into Controller Settings / General (change will be immediately effective, no restart required).
  3. Flow files queued is the number of flow files currently being processed by NiFi. If this graph is continually increasing, it can be a symptom of something going wrong. It can totally be expected to have peaks at some specific hours of the day based on the use cases, and NiFi will protect itself using the backpressure mechanism but, in any case, always good to keep an eye on this metric.
  4. Total task duration can also be something to look at: in case this graph is constantly going higher, it means that it takes more and more time to process the data. It can be expected if volumetry is changing or if workflows are updated but it can also be the symptom that a processor is not performing well.

# The MonitorDiskUsage reporting tasks are useful to be sure that bulletins will be emitted before having a “no space left on device” error message. I recommend setting one per repository (at least 3, but it can be more if you have multiple content/provenance repositories). In my screenshot, I defined one for provenance repository, one for content repository and one for flow file repository.

# The Site to Site bulletin reporting task is used to reingest all the generated bulletins into NiFi. Generally speaking Site to Site reporting tasks are a really powerful tool as it allows you to reingest data generated by NiFi’s framework into NiFi itself so you can use all the processors you want to process this data. With this specific reporting task, you’re able to deal with every generated bulletins into a dedicated workflow and do whatever suits you: email notification, dashboarding, automatic issue creation in JIRA, etc…

# The Site to Site status reporting task is used to frequently take a screenshot of what is going on in all the workflows existing on the canvas. It will generate an JSON array of JSON documents representing the status of every components (processors, ports, relationships, etc) including the last 5-minutes statistics data.

  • NiFi monitoring NiFi

Using strict naming conventions, you can leverage generic workflows dedicated to monitoring and perform quite powerful things. My approach is really to use NiFi as a way to monitor itself. You might ask: what is going to monitor the workflows dedicated to monitoring? To be honest, if there is something wrong going on with the monitoring workflows, you will detect it very quickly through the technical monitoring / Ambari reporting task. Or the NiFi service will just be red in Ambari…

Consequently, my personal recommendation is to ask each project team to respect a strict naming convention and to deliver a workflow designed to fulfill the monitoring requirements for the project workflows. Each workflow will certainly require different notification mechanisms, and different metrics in the dashboards.

At the root level of the canvas, I’ll have one process group per project in order to ensure proper authorization policies. I’ll also have a process group dedicated to monitoring, and two input ports to receive the data sent by reporting tasks (with NIFI-4814, it’ll be possible to only use one):

Screen Shot 2018-02-07 at 2.34.36 PM

In this use case study, I assume that the naming convention is the following: <acronym of the project>_<meaningful component name>

Besides, if the metrics of a component should be captured for dashboarding, the name should be suffixed by _GRAFANA.

With the above assumptions, here is what I have in my monitoring process group:

Screen Shot 2018-02-07 at 2.39.16 PM

The UpdateAttribute is used to add a “type” attribute with the value “bulletins” or “status”. I’ll use this attribute to perform routing at a later stage.

The UpdateRecord is used to populate a “project” field by extracting the acronym from the component name. Here is the schema I use for bulletins, and here is the schema I use for status.

For bulletins:

Screen Shot 2018-02-07 at 2.46.21 PM.png

For status data:

Screen Shot 2018-02-07 at 2.46.41 PM

At this point, I have bulletins and status data and I’m able to link this data to every project running in NiFi. Now I send this data in two process groups: one that is dedicated to technical monitoring (used and managed by the team in charge of NiFi service), and one that is dedicated to business (per-project) monitoring:

Screen Shot 2018-02-07 at 2.49.20 PM

Technical monitoring

In the technical monitoring, it’s really up to you regarding what you want… What I usually do is to send the bulletin data in a tool like AMS, Solr or Elasticsearch to have dedicated dashboards. I also set email notifications when backpressure is enabled in a connection.

For bulletins dashboarding, I slightly change the schema of the data and send it into an Elasticsearch instance:

Screen Shot 2018-02-07 at 2.53.40 PM.png

I then create a dashboard in Grafana:

Screen Shot 2018-02-07 at 3.00.03 PM

If necessary I can use the notification mechanism available in Grafana to get email messages when some thresholds are reached.

With the status data, I decided to design a workflow sending email notifications as soon as backpressure is enabled in a connection:

Screen Shot 2018-02-07 at 9.28.30 PM

To do that, I use the QueryRecord processor to only select records representing connections with backpressure enabled:

Screen Shot 2018-02-07 at 9.30.32 PM.png

Again… with the site to site reporting tasks you are ingesting into NiFi its own data and you can use all the processors you want to process this data and fulfill your requirements.

Business (per-project) monitoring

If we go back to the first level of my monitoring process group, you noticed that I’m also sending both bulletin and status data to another process group for “per-project” monitoring:

Screen Shot 2018-02-07 at 2.49.20 PM

In this process group, here is what I’m doing:

Screen Shot 2018-02-07 at 9.35.11 PM

I’m using a PartitionRecord processor to partition all the data based on the field /project that we computed earlier. This way, each flow file will only contain data with a unique value for the /project field and this value is also extracted as an attribute of the flow files. Then I just have to use a RouteOnAttribute processor to route the data based on the project that generated this data (if a project didn’t respect the naming convention, the project value will not match any project acronym and the data will be dropped, unless you want to process it somehow – to detect teams not respecting the naming convention!). After the RouteOnAttribute, you have one process group per project and that’s this specific piece that should be developed by every project deployed in NiFi. This way, each project receives both bulletins and status related to the project and each project can implement its own workflow to deal with bulletin and status data.

Before presenting what monitoring I’m doing for the use case of this post, I’ll step back a little and present the use case itself and how I leverage the NiFi Registry to deploy my workflow from a development environment to a production environment.

  • Use case presentation

The use case itself is pretty simple and I won’t go too much in the details since it’s not the purpose of this blog. I’m using a HandleHttpRequest processor to listen for HTTP calls on a given port. Then based on what URI has been called by the client I’m routing the flow files to different process group. I’m expecting to receive HTTP POST requests on specific endpoint, each one representing a type of data. The received data is expected to be XML and I’m checking that the data is valid according to an XSD schema. At this point, three options:

  1. if the client requests an endpoint that is not expected, I route the flow file to the HandleHttpResponse processor and return a 404 HTTP code.
  2. if the client posts data on the expected endpoint but the data is not valid, I route the flow file to the HandleHttpResponse processor and return a 400 HTTP code.
  3. if the received data is valid, then I return a 200 HTTP code.

Screen Shot 2018-02-07 at 9.56.24 PM

Screen Shot 2018-02-07 at 9.57.00 PM.png

What I’m doing with valid data in the process group does not really matter and I won’t describe that (this post is already way too long and I’m surprised you’re still reading…).

  • Deployment with NiFi Registry

Let’s try to keep things simple. The NiFi Registry is a web application living outside of NiFi. However, NiFi integrates with the NiFi Registry. This is done by going into the Controller Settings menu where there is a new tab to define the registries to connect with:

Screen Shot 2018-02-07 at 10.01.54 PM.png

With this integration between NiFi and NiFi Registry, you have new icons in the UI you need to get familiar with:

Screen Shot 2018-02-07 at 10.04.10 PM.png

From left to right:

  1. Number of versioned Process Groups that are up to date with the Registry
  2. Number of versioned Process Groups that are locally modified (you can commit your local changes to create a new version of your process group in the Registry)
  3. Number of versioned Process Groups that are stale (there is a more recent version of the Process Group in the Registry and you can update your local process group to this version)
  4. Number of versioned Process Groups that are stale and also with local changes
  5. Number of versioned Process Groups with a synchronization failure (the Registry is not available / unreachable, etc)

The interactions with the Registry through the NiFi UI can be done by right clicking on a process group or on the canvas inside a process group: this will show a Version menu with available commands.

Screen Shot 2018-02-07 at 10.09.36 PM.png

On the NiFi Registry side, here is what is looks like:

Screen Shot 2018-02-07 at 10.12.33 PM.png

In the Registry, it’s possible to create buckets (you’ll usually have one bucket per project). Then, from the NiFi UI, you’ll be able to start versioning a workflow in a bucket. To do that: right click on the process group, Version menu, start version control.

Screen Shot 2018-02-07 at 10.14.44 PM.png

Then you’ll be able to choose the bucket and to give some information about your workflow:

Screen Shot 2018-02-07 at 10.15.18 PM

You now have your workflow versioned in the registry !

Screen Shot 2018-02-07 at 10.16.39 PM

Now… how do I deploy my workflow on the production cluster?

I just have to drag and drop a process group component on the canvas, and I’ll see:

Screen Shot 2018-02-07 at 10.27.14 PM.png

And I can just click on Import to select the bucket, workflow and version I want:

Screen Shot 2018-02-07 at 10.28.26 PM

I now have my workflow imported on the production cluster. Two things to do:

  1. Update the variables of the process group that are used to externalize environment-specific properties of the workflow components. Note that changes to the variables value are not considered as local changes from Registry point of view because two instances of the same workflow version will have different values for different environments.
  2. Start the controller services and processors. When importing a new workflow for the first time, everything is stopped to let users change the variables. When updating an existing workflow to a new version, only the new components will need to be started (no service interruption).

To access variables, you can right click on the process group and click on the Variables menu:

Screen Shot 2018-02-07 at 10.46.10 PM.png

And then you can update the variables with values corresponding to your environment:

Screen Shot 2018-02-07 at 10.46.20 PM.png

If you have multi-levels process groups and if you defined variables at different levels, you will also need to update variables in sub-process groups.

  • Business monitoring for my use case

If you paid attention, in my screenshot of the NiFi Registry, you probably noticed two versioned workflows in the bucket dedicated to my use case. The other workflow is the one dedicated to the monitoring that you also noticed in a previous screenshot:

Screen Shot 2018-02-07 at 9.35.11 PM

Let’s quickly discuss what I suggest for project-specific monitoring. The first thing you might want to do is to receive email notifications when bulletins are generated by components of your workflow.

Then, what I usually suggest for per-project dashboarding is to export the statistics of specific points of the workflow to an external tool and create dashboards. As I said before, in my example, I’m exporting to the Ambari Metrics Service the last 5-minutes count and bytes sum of the flow files going through connections with a name suffixed with _GRAFANA. In my example, I renamed connections to automatically export statistics to get the number of HTTP code 200, 400 and 404:

Screen Shot 2018-02-07 at 10.40.04 PM.png

And here is my monitoring workflow for this specific use case:

Screen Shot 2018-02-07 at 10.42.43 PM.png

I route the monitoring data to have a specific processing for bulletins (email notifications in this case) and a specific processing for status. First, I use a PartitionRecord processor and a RouteOnAttribute to only extract status data for connections (I ignore data about processors, process groups, ports, etc).

Screen Shot 2018-02-07 at 10.45.39 PM.png

Then I split my JSON arrays and extract the field containing the name of the connection to only keep the connections suffixed by _GRAFANA. Then I transform the content to match the format expected by AMS and send the request to AMS (more details here).

And… that’s it… I’m now able to create Grafana dashboards for my specific use case:

Screen Shot 2018-02-07 at 10.53.06 PM.png

Another thing you might want to do is to define a dedicated Ambari Reporting Task with the UUID of the process group dedicated to the project. This way you have additional statistics about this specific process group: number of active threads, number of flow files queued, volume of data received, etc.

  • What about workflow update?

Ok… so I deployed my workflows in production and have great ways to monitor things. What if I’ve a new endpoint to handle in my workflow. Really simple! So far I only process “purchase” data, let’s add another process group to deal with “customer” data:

Screen Shot 2018-02-07 at 11.03.15 PM

I updated my RouteOnAttribute to accept a new endpoint and added a process group to process this data. I added my HTTP 200 & 400 relationships for this new type of data and I carefully renamed the output connections to automatically get this new data into my dashboards.

I can now see that my process group does have local changes:

Screen Shot 2018-02-07 at 11.06.34 PM.png

And I can commit the changes as a new version of my workflow:

Screen Shot 2018-02-07 at 11.07.19 PM.png

Screen Shot 2018-02-07 at 11.07.40 PM

I can now check that, on production’s side, there is a new version available and I can update to this new version:

Screen Shot 2018-02-07 at 11.10.20 PM.png

I can right click on the process group and update to the latest version:

Screen Shot 2018-02-07 at 11.11.23 PM.png

I select the version I want:

Screen Shot 2018-02-07 at 11.11.33 PM

And that’s it! Note that the update from a version A to a version B will stop and restart existing components if and only if modifications have been done on the components. In my case, the HandleHttpRequest has not been modified… consequently, the update of the workflow will not cause any service interruption. That’s pretty cool!

Also note that the update will not cause any data loss in case the update includes the deletion of connections that are currently containing flow files.

And finally, I do have to update, if necessary, the variables of my newly added process group (for customer data) and to start it.

  • Conclusion

I’ve already talked about way too much things, but I hope it’ll get you excited about the NiFi Registry! Please install it and use it, that’s really going to ease your life. As always I invite you to join the Apache NiFi community by subscribing to mailing lists, submitting JIRAs for NiFi, MiNiFi Java/C++, NiFi Registry, and contribute code on NiFi, MiNiFi Java/C++ and NiFi Registry.

Feel free to comment and ask your questions on this post! Thanks for reading me!

Authorizations with LDAP synchronization in Apache NiFi 1.4+

With the release of Apache NiFi 1.4.0, quite a lot of new features are available. One of it is the improved management of the users and groups. Until this release, it was possible to configure a LDAP (or Active Directory) server but it was only used during the authentication process. Once authenticated it was necessary to have explicit policies for this user to access NiFi resources. And to create a policy for a given user, it was first necessary to manually create this user in NiFi users/groups management view. This time is now over. Users/groups management is now greatly simplified in terms of lifecycle management.

In addition to that, if you are using Apache Ranger as the external authorizer system for NiFi, you can now define rules based on LDAP groups. Before, you had to configure, in Ranger, rules explicitly based on users.

In this article, we are going to discuss how this is actually working and how you can configure it.

If you’re interested by the technical details of the implementation, you can look at the corresponding JIRAs (NIFI-4032, NIFI-4059, NIFI-4127) and Github pull requests (#1923, #1978, #2019).

Basically, the authorizer mechanism evolved quite a bit. Before NiFi 1.4, the authorizers.xml was containing a list of configurations for any authorizer implementation you wanted to use to manage policies in NiFi. Unless you developed your own implementations, you had the choice between the FileAuthorizer (default implementation that stores the policies in a local file) and the RangerNiFiAuthorizer to user Apache Ranger as the external mechanism managing the policies.

If using the FileAuthorizer, the configuration was looking like (in a single node installation):

    <authorizer>
        <identifier>file-provider</identifier>
        <class>org.apache.nifi.authorization.FileAuthorizer</class>
        <property name="Authorizations File">./conf/authorizations.xml</property>
        <property name="Users File">./conf/users.xml</property>
        <property name="Initial Admin Identity”>admin</property>
        <property name="Legacy Authorized Users File"></property>
    </authorizer>

And we set the corresponding property in the nifi.properties file:

nifi.security.user.authorizer=file-provider

Starting with NiFi 1.4, the authorizers.xml file provides much more functionalities (note that the changes are backward compatible and do not require any change from your side if you don’t want to change it).

Let’s start by the new implementation of the authorizer: the Standard Managed Authorizer.

Note – there is also a new Managed Ranger Authorizer, but I won’t go into the details of this implementation in this blog. This implementation gives you the possibility to use Apache Ranger as the external system managing the authorizations but you still have access to the policies in the NiFi UI, and you can also manage additional users. It’s also this implementation that allows you to define group-based policies in Ranger.

It’s configured as below:

    <authorizer>
        <identifier>managed-authorizer</identifier>
        <class>org.apache.nifi.authorization.StandardManagedAuthorizer</class>
        <property name="Access Policy Provider">file-access-policy-provider</property>
    </authorizer>

This new implementation expects the identifier of the Access Policy Provider implementation you want to use. This new abstraction will be used to access and manage users, groups and policies… and to enforce policies when dealing with requesting access to NiFi resources. In the above example, our authorizer is identified with name “managed-authorizer”, and that’s what you need to set in nifi.properties to user it:

nifi.security.user.authorizer=managed-authorizer

You can see that this authorizer expects a property Access Policy Provider with the identifier of the provider you want to use… Let’s move on to the Access Policy Provider. For now, there is a single implementation which is the FileAccessPolicyProvider. If you already know about the previous FileAuthorizer, you shouldn’t be very surprised by the expected properties. Here is a configuration example:

    <accessPolicyProvider>
        <identifier>file-access-policy-provider</identifier>
        <class>org.apache.nifi.authorization.FileAccessPolicyProvider</class>
        <property name="User Group Provider">file-user-group-provider</property>
        <property name="Authorizations File">./conf/authorizations.xml</property>
        <property name="Initial Admin Identity"></property>
        <property name="Legacy Authorized Users File"></property>

        <property name="Node Identity 1"></property>
    </accessPolicyProvider>

Note: as you can see the identifier of this Access Policy Provider is “file-access-policy-provider”, and that’s what we referenced in the property of the authorizer (see above).

As with the FileAuthorizer, you have the Initial Admin Identity property which lets you configure the identity of the user with the admin permissions to set the first policies after a fresh install of NiFi. As the documentation says:

Initial Admin Identity – The identity of an initial admin user that will be granted access to the UI and given the ability to create additional users, groups, and policies. The value of this property could be a DN when using certificates or LDAP, or a Kerberos principal. This property will only be used when there are no other policies defined. If this property is specified then a Legacy Authorized Users File cannot be specified.
NOTE: Any identity mapping rules specified in nifi.properties will also be applied to the initial admin identity, so the value should be the unmapped identity. This identity must be found in the configured User Group Provider.

Then you still have the Legacy Authorized Users File property in case you are upgrading from a NiFi 0.x install and you want to keep your previous policies in place.

You have the Authorizations File property that defines the path to the file that will locally store all the policies. You also find the Node Identity properties in case you are in a NiFi cluster. Nothing changed on this side, but just in case, a quick reminder from the official documentation:

Node Identity [unique key] – The identity of a NiFi cluster node. When clustered, a property for each node should be defined, so that every node knows about every other node. If not clustered these properties can be ignored. The name of each property must be unique, for example for a three nodes cluster: “Node Identity A”, “Node Identity B”, “Node Identity C” or “Node Identity 1”, “Node Identity 2”, “Node Identity 3”.
NOTE: Any identity mapping rules specified in nifi.properties will also be applied to the node identities, so the values should be the unmapped identities (i.e. full DN from a certificate). This identity must be found in the configured User Group Provider.

OK… now we have a new property called “User Group Provider” and that’s where we’re going to specify the identifier of the User Group Provider to be used. This User Group Provider is a new abstraction allowing you to define how users and groups should be automatically retrieved to then define policies on them.

You have multiple implementations available:
  • CompositeUserGroupProvider
  • CompositeConfigurableUserGroupProvider
  • LdapUserGroupProvider
  • FileUserGroupProvider

As the name suggests, the CompositeUserGroupProvider implementation allows you to use at the same time multiple implementations of the User Group Provider. This is very useful, mainly because when using NiFi in clustering mode, you need to define some policies for the nodes belonging to the cluster. And, as you may know, in NiFi, nodes are considered as users. In case your nodes are not defined in your LDAP or Active Directory, you will certainly want to use the composite implementation.

Now you need to consider the CompositeConfigurableUserGroupProvider implementation which is the one you will certainly want to use in most cases. This implementation will also provide support for retrieving users and groups from multiple sources. But the huge difference is that this implementation expects a single configurable user group provider. It means that users and groups from the configurable user group provider are configurable from the UI (as you did when creating users/groups from NiFi UI in previous versions). However, users/groups loaded from one of the other User Group Providers will not be.

Note that it’s up to each User Group provider implementation to define if it is configurable or not. For instance, the LDAP User Group Provider is not configurable: NiFi is not going to manage users and groups in the LDAP/AD server.

A typical configuration will be the definition of the Composite Configurable User Group provider with the File User Group provider as the configurable instance and one instance of the LDAP User Group provider:

    <userGroupProvider>
       <identifier>composite-configurable-user-group-provider</identifier>
       <class>org.apache.nifi.authorization.CompositeConfigurableUserGroupProvider</class>
       <property name="Configurable User Group Provider">file-user-group-provider</property>
       <property name="User Group Provider 1">ldap-user-group-provider</property>
    </userGroupProvider>

In this case, in the definition of the access policy provider, we need to change the property to use the correct user group provider:

    <accessPolicyProvider>
        <identifier>file-access-policy-provider</identifier>
        <class>org.apache.nifi.authorization.FileAccessPolicyProvider</class>
        <property name="User Group Provider">composite-configurable-user-group-provider</property>
        <property name="Authorizations File">./conf/authorizations.xml</property>
        <property name="Initial Admin Identity"></property>
        <property name="Legacy Authorized Users File"></property>

        <property name="Node Identity 1"></property>
    </accessPolicyProvider>

Now, let’s look at the File User Group provider. The objective of this provider is to provide the same functionalities as before: the user can manage users and groups from the UI and everything is stored locally in a file. Configuration looks like:

    <userGroupProvider>
       <identifier>file-user-group-provider</identifier>
       <class>org.apache.nifi.authorization.FileUserGroupProvider</class>
       <property name="Users File">./conf/users.xml</property>
       <property name="Legacy Authorized Users File"></property>

       <property name="Initial User Identity 1"></property>
    </userGroupProvider>

The initial user identities are users that should automatically populated when creating the users.xml file for the first time. Typically you would define here your initial admin identity (if this user is not defined via the LDAP user group provider). From the documentation:

Initial User Identity [unique key] – The identity of a users and systems to seed the Users File. The name of each property must be unique, for example: “Initial User Identity A”, “Initial User Identity B”, “Initial User Identity C” or “Initial User Identity 1”, “Initial User Identity 2”, “Initial User Identity 3”.

NOTE: Any identity mapping rules specified in nifi.properties will also be applied to the user identities, so the values should be the unmapped identities (i.e. full DN from a certificate).

OK… now let’s move to the last user group provider: the one allowing an automatic synchronisation of your users and groups with a LDAP/AD server. Here is the configuration part:

    <userGroupProvider>
       <identifier>ldap-user-group-provider</identifier>
       <class>org.apache.nifi.ldap.tenants.LdapUserGroupProvider</class>
       <property name="Authentication Strategy">START_TLS</property>

       <property name="Manager DN"></property>
       <property name="Manager Password"></property>

       <property name="TLS - Keystore"></property>
       <property name="TLS - Keystore Password"></property>
       <property name="TLS - Keystore Type"></property>
       <property name="TLS - Truststore"></property>
       <property name="TLS - Truststore Password"></property>
       <property name="TLS - Truststore Type"></property>
       <property name="TLS - Client Auth"></property>
       <property name="TLS - Protocol"></property>
       <property name="TLS - Shutdown Gracefully"></property>

       <property name="Referral Strategy">FOLLOW</property>
       <property name="Connect Timeout">10 secs</property>
       <property name="Read Timeout">10 secs</property>

       <property name="Url"></property>
       <property name="Page Size"></property>
       <property name="Sync Interval">30 mins</property>

       <property name="User Search Base"></property>
       <property name="User Object Class">person</property>
       <property name="User Search Scope">ONE_LEVEL</property>
       <property name="User Search Filter"></property>
       <property name="User Identity Attribute"></property>
       <property name="User Group Name Attribute"></property>
       <property name="User Group Name Attribute - Referenced Group Attribute"></property>

       <property name="Group Search Base"></property>
       <property name="Group Object Class">group</property>
       <property name="Group Search Scope">ONE_LEVEL</property>
       <property name="Group Search Filter"></property>
       <property name="Group Name Attribute"></property>
       <property name="Group Member Attribute"></property>
       <property name="Group Member Attribute - Referenced User Attribute"></property>
    </userGroupProvider>

You can find the usual parameters that you configured for the LDAP authentication part, but there is also a lot of new parameters to only synchronized specific parts of your remote LDAP/AD servers. The documentation says:

‘Url’ – Space-separated list of URLs of the LDAP servers (i.e. ldap://<hostname>:<port>).

‘Page Size’ – Sets the page size when retrieving users and groups. If not specified, no paging is performed.

‘Sync Interval’ – Duration of time between syncing users and groups (i.e. 30 mins). Minimum allowable value is 10 secs.

‘User Search Base’ – Base DN for searching for users (i.e. ou=users,o=nifi). Required to search users.

‘User Object Class’ – Object class for identifying users (i.e. person). Required if searching users.

‘User Search Scope’ – Search scope for searching users (ONE_LEVEL, OBJECT, or SUBTREE). Required if searching users.

‘User Search Filter’ – Filter for searching for users against the ‘User Search Base’ (i.e. (memberof=cn=team1,ou=groups,o=nifi) ). Optional.

‘User Identity Attribute’ – Attribute to use to extract user identity (i.e. cn). Optional. If not set, the entire DN is used.

‘User Group Name Attribute’ – Attribute to use to define group membership (i.e. memberof). Optional. If not set group membership will not be calculated through the users. Will rely on group membership being defined through ‘Group Member Attribute’ if set. The value of this property is the name of the attribute in the user ldap entry that associates them with a group. The value of that user attribute could be a dn or group name for instance. What value is expected is configured in the ‘User Group Name Attribute – Referenced Group Attribute’.

‘User Group Name Attribute – Referenced Group Attribute’ – If blank, the value of the attribute defined in ‘User Group Name Attribute’ is expected to be the full dn of the group. If not blank, this property will define the attribute of the group ldap entry that the value of the attribute defined in ‘User Group Name Attribute’ is referencing (i.e. name). Use of this property requires that ‘Group Search Base’ is also configured.

‘Group Search Base’ – Base DN for searching for groups (i.e. ou=groups,o=nifi). Required to search groups.

‘Group Object Class’ – Object class for identifying groups (i.e. groupOfNames). Required if searching groups.

‘Group Search Scope’ – Search scope for searching groups (ONE_LEVEL, OBJECT, or SUBTREE). Required if searching groups.

‘Group Search Filter’ – Filter for searching for groups against the ‘Group Search Base’. Optional.

‘Group Name Attribute’ – Attribute to use to extract group name (i.e. cn). Optional. If not set, the entire DN is used.

‘Group Member Attribute’ – Attribute to use to define group membership (i.e. member). Optional. If not set group membership will not be calculated through the groups. Will rely on group membership being defined through ‘User Group Name Attribute’ if set. The value of this property is the name of the attribute in the group ldap entry that associates them with a user. The value of that group attribute could be a dn or memberUid for instance. What value is expected is configured in the ‘Group Member Attribute – Referenced User Attribute’. (i.e. member: cn=User 1,ou=users,o=nifi vs. memberUid: user1)

‘Group Member Attribute – Referenced User Attribute’ – If blank, the value of the attribute defined in ‘Group Member Attribute’ is expected to be the full dn of the user. If not blank, this property will define the attribute of the user ldap entry that the value of the attribute defined in ‘Group Member Attribute’ is referencing (i.e. uid). Use of this property requires that ‘User Search Base’ is also configured. (i.e. member: cn=User 1,ou=users,o=nifi vs. memberUid: user1)

NOTE: Any identity mapping rules specified in nifi.properties will also be applied to the user identities. Group names are not mapped.

Please find more information in the documentation here.

If I have to summarize a bit the new authorizers.xml file structure, I could use this image:

 

Screen Shot 2017-12-22 at 6.25.03 PM

Now we discussed the technical details. Let’s demo it. I’ll re-use Apache Directory Studio to setup a local LDAP server as I did in my article about LDAP authentication with NiFi. I’ll skip the details (please refer to the article if needed) and create the following structure:

Screen Shot 2017-12-22 at 4.20.38 PM.png

In a group, I have:

Screen Shot 2017-12-22 at 4.21.43 PM

And for a user, I have:

Screen Shot 2017-12-22 at 4.22.25 PM

Note that I’m using a very bad hack because, by default, the attribute ‘memberOf’ is not available unless you define additional objectClass. As a workaround, I’m using the ‘title’ attribute to represent the membership of a user to different groups. It’s quick and dirty, but it’ll do for this demo.

Now, here is my authorizers.xml file:

<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<authorizers>
  <userGroupProvider>
    <identifier>file-user-group-provider</identifier>
    <class>org.apache.nifi.authorization.FileUserGroupProvider</class>
    <property name="Users File">./conf/users.xml</property>
    <property name="Legacy Authorized Users File"></property>
    <property name="Initial User Identity 1"></property>
  </userGroupProvider>

  <userGroupProvider>
    <identifier>ldap-user-group-provider</identifier>
    <class>org.apache.nifi.ldap.tenants.LdapUserGroupProvider</class>
    <property name="Authentication Strategy">SIMPLE</property>

    <property name="Manager DN">uid=admin,ou=system</property>
    <property name="Manager Password">secret</property>

    <property name="Referral Strategy">FOLLOW</property>
    <property name="Connect Timeout">10 secs</property>
    <property name="Read Timeout">10 secs</property>

    <property name="Url">ldap://localhost:10389</property>
    <property name="Page Size"></property>
    <property name="Sync Interval">30 mins</property>

    <property name="User Search Base">ou=people,dc=nifi,dc=com</property>
    <property name="User Object Class">person</property>
    <property name="User Search Scope">ONE_LEVEL</property>
    <property name="User Search Filter">(title=cn=nifi,ou=groups,dc=nifi,dc=com)</property>
    <property name="User Identity Attribute">cn</property>
    <property name="User Group Name Attribute">title</property>
    <property name="User Group Name Attribute - Referenced Group Attribute"></property>

    <property name="Group Search Base"></property>
    <property name="Group Object Class">group</property>
    <property name="Group Search Scope">ONE_LEVEL</property>
    <property name="Group Search Filter"></property>
    <property name="Group Name Attribute">cn</property>
    <property name="Group Member Attribute"></property>
    <property name="Group Member Attribute - Referenced User Attribute"></property>
  </userGroupProvider>

  <userGroupProvider>
    <identifier>composite-configurable-user-group-provider</identifier>
    <class>org.apache.nifi.authorization.CompositeConfigurableUserGroupProvider</class>
    <property name="Configurable User Group Provider">file-user-group-provider</property>
    <property name="User Group Provider 1">ldap-user-group-provider</property>
  </userGroupProvider>

  <accessPolicyProvider>
    <identifier>file-access-policy-provider</identifier>
    <class>org.apache.nifi.authorization.FileAccessPolicyProvider</class>
    <property name="User Group Provider">composite-configurable-user-group-provider</property>
    <property name="Authorizations File">./conf/authorizations.xml</property>
    <property name="Initial Admin Identity">admin</property>
    <property name="Legacy Authorized Users File"></property>
    <property name="Node Identity 1"></property>
 </accessPolicyProvider>

  <authorizer>
    <identifier>managed-authorizer</identifier>
    <class>org.apache.nifi.authorization.StandardManagedAuthorizer</class>
    <property name="Access Policy Provider">file-access-policy-provider</property>
  </authorizer>
</authorizers>

In this case, I decide to go through the users defined in my ‘people’ OU, to filter only the users belonging to the ‘nifi’ group and to use the ‘cn’ attribute as the username. I also specify that the ‘title’ attribute is the group membership of a user. This way, NiFi is able to do the mapping between the users and groups. Note that my ‘admin’ user that I defined as my initial admin identity is in my LDAP server, and I don’t need to define it in the File User Group provider definition.

When starting NiFi and connecting to it as the ‘admin’ user, I can go in the Users view and I can find:

Screen Shot 2017-12-22 at 4.37.29 PM

Note that the button to add users and groups is available since I used the Composite Configurable User Group provider and defined the File User Group provider. That’s how I would specify my nodes as users if I don’t want to have the servers in my LDAP/AD.

Also note that this will automatically be synchronized with LDAP/AD based on the “Sync Interval” you specified in the authorizers configuration file.

Finally, as mentioned in the docs, remember that the order is important when using composite providers in case you have users/groups collisions between multiple sources.

With this configuration, I don’t have to care anymore about defining users and groups in NiFi and I can directly create my policies. It’s much more efficient to manage everything in case people are leaving, or changing of projects. Cool, isn’t it?

Let me know if you have any comment/question.

XML data processing with Apache NiFi

Notelook at the new features in NiFi 1.7+ about XML processing in this post

I recently had to work on a NiFi workflow to process millions of XML documents per day. One of the step being the conversion of the XML data into JSON. It raises the question of the performances and I will briefly expose my observations in this post.

The two most natural approaches to convert XML data with Apache NiFi are:

  • Use the TransformXML processor with a XSLT file
  • Use a scripted processor or use a custom Java processor relying on a library

There are few XSLT available on the internet providing a generic way to transform any XML into a JSON document. That’s really convenient and easy to use. However, depending of your use case, you might need specific features.

In my case, I’m processing a lot of XML files based on the same input schema (XSD) and I want the output to be compliant to the same Avro schema (in order to use the record-oriented processors in NiFi). The main issue is to force the generation of an array when you only have one single element in your input.

XSLT approach

Example #1:

<MyDocument>
  <MyList>
    <MyElement>
      <Text>Some text...</Text>
      <RecordID>1</RecordID>
    </MyElement>
    <MyElement>
      <Text>Some text...</Text>
      <RecordID>1</RecordID>
    </MyElement>
  </MyList>
</MyDocument>

This XML document will be converted into the following JSON:

{
   "MyDocument" : {
     "MyList" : {
       "MyElement" : [ {
           "Text" : "Some text...",
           "RecordID" : 1
         }, {
           "Text" : "Some text...",
           "RecordID" : 2
         } ]
      }
   }
}

Example #2:

However, if you have the following XML document:

<MyDocument>
  <MyList>
    <MyElement>
      <Text>Some text...</Text>
      <RecordID>1</RecordID>
    </MyElement>
  </MyList>
</MyDocument>

The document will be converted into:

{
  "MyDocument" : {
    "MyList" : {
      "MyElement" : {
        "Text" : "Some text...",
        "RecordID" : 1
      }
    }
  }
}

Force array

And here start the problems… because we don’t have the same Avro schema. That is why I recommend using the XSLT file provided by Bram Stein here on Github. It provides a way to force the creation of an array. To do that, you need to insert a tag into your XML input file. The tag to insert is

json:force-array="true"

But for this tag to be correctly interpreted, you also need to specify the corresponding namespace:

xmlns:json="http://json.org/"

In the end, using ReplaceText processors with regular expressions, you need to have the following input (for the example #2):

<MyDocument xmlns:json="http://json.org/">
  <MyList>
    <MyElement json:force-array="true">
      <Text>Some text...</Text>
      <RecordID>1</RecordID>
    </MyElement>
  </MyList>
</MyDocument>

And this will give you:

{
  "MyDocument" : {
    "MyList" : {
      "MyElement" : [ {
        "Text" : "Some text...",
        "RecordID" : 1
      } ]
    }
  }
}

And now I do have the same schema describing my JSON documents. Conclusion: you need to use regular expressions to add a namespace in the first tag of your document and add the JSON array tag in every tag wrapping elements that should be part of an array.

Java approach

Now, let’s assume you’re not afraid about using scripted processors or developing your own custom processor. Then it’s really easy to have a processor doing the same using a Java library like org.json (note that library is *NOT* Apache friendly in terms of licensing and that’s why the following code cannot be released with Apache NiFi). Here is an example of custom processor doing the conversion. And here is a Groovy version for the ExecuteScript processor.

What about arrays with this solution? Guess what… It’s kind of similar: you have to use a ReplaceText processor before and after to ensure that arrays are arrays in the JSON output for any number of elements in your input. Also, you might have to do some other transformations like removing the namespaces or replacing empty strings

""

by

null

values (by default, everything will be converted to an empty string although you might want null record instead).

To force arrays, the easiest approach is to double every tag that should be converted into an array. With the example #2, I transform my input to have:

<MyDocument>
  <MyList>
    <MyElement /><MyElement>
      <Text>Some text...</Text>
      <RecordID>1</RecordID>
    </MyElement>
  </MyList>
</MyDocument>

It’ll give me the following JSON:

{
  "MyDocument" : {
    "MyList" : {
      "MyElement" : [ "", {
        "Text" : "Some text...",
        "RecordID" : 1
      } ]
    }
  }
}

And, then, I can use another ReplaceText processor to remove the unwanted empty strings created by the conversion.

Conclusion: with the two approaches you’ll need to be a bit intrusive in your data to get the expected results. What about the performances now?

Benchmark

I remove the ReplaceText processors from the equation as I usually need the same amount of regular expressions work in both cases. I want to only focus on:

I’ll compare the performances of each case using input of different sizes (data generated using a GenerateFlowFile processor) with default configuration (one thread, no change on run duration, etc) on my laptop.

Method: I’m generating as much data as possible (it’s always the same file during a single run) using the GenerateFlowFile processor. I wait at least 5 minutes to have a constant rate of processing and I get the mean on a 5 minutes window of constant processing.

Screen Shot 2017-09-07 at 12.12.12 AM.png

For each run, I’m only running the GenerateFlowFile, one of the three processors I’m benchmarking, and the UpdateAttribute (used to only drop the data).

The input data used for the benchmark is a fairly complex XML document with arrays of arrays, lot of elements in the arrays, deeply nested records, etc. To reduce the size of the input size, I’m not changing the structure but only removing elements in the arrays. In other words: the schema describing the output data remains the same for each run.

Note that the custom Java/Groovy option is loading the full XML document in memory. To process very large XML document, a streaming approach with another library would certainly be better suited.

Here are the results with input data of 5KB, 10KB, 100KB, 500KB and 1000KB. The below graph gives the number of XML files processed per second based on the input size for each solution.

Screen Shot 2017-09-07 at 10.16.45 PM

It’s clear that the custom Java processor is the most efficient one. The XSLT option is really nice when you want to do very specific transformations but it can quickly get slow. Using a generic XSLT file for XML to JSON transformation is easy and convenient but won’t be the most efficient option.

We can also notice that the Groovy option is a little bit less efficient than the Java one, but that’s expected. Nevertheless, the Groovy option provides pretty good performances and does not require building and compiling a custom processor: everything can be done directly from the NiFi UI.

To improve the performances, it’s then possible to play with the “run duration” parameter and increase the number of concurrent tasks. Actually it’s quite easy to reach the I/O limitations of the disks. Using a NiFi cluster and multiple disks for the content repository, it’s really easy to process hundreds of millions of XML documents per day.

If we display the performance ratio based on the file size between the XSLT solution and the Java based solution, we have:

Screen Shot 2017-09-07 at 10.28.46 PM

We can see that with very small files, the processing using Java-based processor is about 13x more efficient than the XSLT approach. But with files over 100KB, the Java solution is about 26x more efficient. That’s because the NiFi framework is doing few things before and after a flow file has been processed. When processing thousands of flow files per second it creates a small overhead that explains the difference.

XML Record Reader

Since few versions, Apache NiFi contains record-oriented processors. It provides very powerful means to process record-oriented data. In particular, it allows users to process batches of data instead of a “per-file” processing. This provides a very robust and high rate processing. While I’m writing this post there is no reader for XML data yet. However there is a JIRA for it and it would provide few interesting features:

  • By using a schema describing the XML data, it’d remove the need to use ReplaceText processors to handle the “array problem”.
  • It’d give the possibility to merge XML documents together to process much more data at once providing even better performances.

This effort can be tracked under NIFI-4366.

As usual, feel free to post any comment/question/feedback.

https://gist.github.com/pvillard31/408c6ba3a9b53880c751a35cffa9ccea.js

Monitoring NiFi – Scripted Reporting Task

Note – This article is part of a series discussing subjects around NiFi monitoring.

In the new release of Apache NiFi (1.2.0), you can now develop Scripted Reporting Task thanks to NIFI-1458. It is the same approach as with the ExecuteScript processor for which you have tons of great examples here.

You might also want to read the following posts:

With the ScriptedReportingTask you can define your own implementation of the onTrigger() method and get access to:

  • ReportingContext context (which gives you access to various information such as events, provenance, bulletins, controller services, process groups, etc)
  • VirtualMachineMetrics vmMetrics (to access the metrics of the JVM)
  • ComponentLog log (if you want to log messages)

Let’s start with a very easy example: I want to log the number of threads inside my JVM every minute. Here is my code:

log.info("Thread count = " + vmMetrics.daemonThreadCount())

Screen Shot 2017-05-12 at 5.43.45 PM.png

And I can check in my nifi-app.log file that I do have:

2017-05-12 17:43:27,639 INFO [Timer-Driven Process Thread-5] o.a.n.r.script.ScriptedReportingTask ScriptedReportingTask[id=fd1668eb-015b-1000-1974-5ef96e1f9a8b] Thread count = 29

OK… now I won’t go into the details of all the information you can access using the “context” variable but let’s try another example…

I want to send a POST request over HTTP containing a JSON representation of the summary of my root process group.

I’m not really used to Groovy so please excuse my coding style ;-). But here is a working code (available here as well):

def json = Class.forName("javax.json.Json")
def httpClients = Class.forName("org.apache.http.impl.client.HttpClients")
def contentType = Class.forName("org.apache.http.entity.ContentType")

def status = context.getEventAccess().getControllerStatus();
def factory = json.createBuilderFactory(Collections.emptyMap());
def builder = factory.createObjectBuilder();

builder.add("componentId", status.getId());
builder.add("bytesRead", status.getBytesRead());
builder.add("bytesWritten", status.getBytesWritten());
builder.add("bytesReceived", status.getBytesReceived());
builder.add("bytesSent", status.getBytesSent());
builder.add("bytesTransferred", status.getBytesTransferred());
builder.add("flowFilesReceived", status.getFlowFilesReceived());
builder.add("flowFilesSent", status.getFlowFilesSent());
builder.add("flowFilesTransferred", status.getFlowFilesTransferred());
builder.add("inputContentSize", status.getInputContentSize());
builder.add("inputCount", status.getInputCount());
builder.add("outputContentSize", status.getOutputContentSize());
builder.add("outputCount", status.getOutputCount());
builder.add("queuedContentSize", status.getQueuedContentSize());
builder.add("activeThreadCount", status.getActiveThreadCount());
builder.add("queuedCount", status.getQueuedCount());

def requestEntity = new org.apache.http.entity.StringEntity(builder.build().toString(), contentType.APPLICATION_JSON);
def httpclient = httpClients.createDefault();
def postMethod = new org.apache.http.client.methods.HttpPost("http://localhost:9999/rootStatus");
postMethod.setEntity(requestEntity);
httpclient.execute(postMethod);
httpclient.close();

Note – this should be improved to, for instance, properly handle potential exceptions.

To get this code working, I also have to specify the required dependencies (JSON, Apache HTTP, etc). For that, in the module directory property of the reporting task, I gave the following paths (because I’m lazy, I am pointing to much dependencies than required):

  • /var/lib/nifi/work/nar/extensions/nifi-standard-nar-1.2.0.nar-unpacked/META-INF/bundled-dependencies/
  • /var/lib/nifi/work/nar/extensions/nifi-site-to-site-reporting-nar-1.2.0.nar-unpacked/META-INF/bundled-dependencies/

In my example I’m sending my JSON payload with a POST HTTP request to localhost on port 9999 with the path rootStatus. To receive the request, I started a ListenHttp processor with the following configuration:

Screen Shot 2017-05-12 at 8.00.31 PM.png

Once my reporting task is started, I start receiving the information as flow files:

Screen Shot 2017-05-12 at 8.08.55 PM.png

This scripted reporting task allows you to quickly develop proof of concept to send information to your internal systems using the interfaces you want. However, according to your needs, it might be more interesting to develop your own reporting task in Java and to build the corresponding NAR. It will give you more flexibility/options (you’ll be able to implement more interfaces) and better performances.

As usual feel free to ask questions and comment this post.

Monitoring NiFi – Ambari & Grafana

Note – This article is part of a series discussing subjects around NiFi monitoring.

When using Apache NiFi (note that version 1.2.0 is now released!) as part of HDF, a lot of of things are simplified using Apache Ambari to deploy NiFi and manage its configuration. Also, using Ambari Metrics service and Grafana, you have a way to easily and visually monitor NiFi performances. And you can also use Apache Ranger to centralize the authorizations management for multiple components (NiFi, Kafka, etc) in one single place.

This article will discuss how you can use Ambari Metrics and Grafana to improve your NiFi monitoring. Let’s start with a quick discussion around AMS (Ambari Metrics System). By default this service is running a Metrics Collector with an embedded HBase instance (and a Zookeeper instance) to store all the metrics, and Ambari will also deploy Metrics Monitor instances on all the nodes of the cluster. The monitors will collect the metrics at system level and send the metrics to the collector. However, the collector also exposes a REST API and that’s what NiFi is going to use with the AmbariReportingTask.

GrafanaBlogOverview

Source and documentation is on the Hortonworks website here.

When using HDF, the Ambari Reporting task should be already up and running for you. If not, you can add it and configure it with a frequency of one minute (it does matter) and use the following parameters:

Screen Shot 2017-05-11 at 9.30.46 AM

Note that “ambari.metrics.collector.url” is an environment variable already set for you when Ambari is starting NiFi. You could also directly give the address, in my case:

http://pvillard-hdf-1:6188/ws/v1/timeline/metrics

Once this reporting task is up and running, you should be able to see the metrics on the NiFi service page in Ambari:

Screen Shot 2017-05-11 at 9.38.57 AM.png

Also, you can go into Grafana to display dashboards with the metrics of your components. You have pre-configured dashboards and here is the one for NiFi:

Screen Shot 2017-05-11 at 9.46.03 AM.png

Now, all the metrics we have here are at cluster level. We are not able to display metrics for specific workflows. With the latest release of Apache NiFi (1.2.0), there is now an optional parameter in the AmbariReportingTask to specify a process group ID. This way, by creating a second reporting task (keep the one providing cluster-level metrics) and by specifying the ID of a specific process group, you can actually create your Grafana dashboards at workflow level.

Let’s say I’ve the following workflow:

Screen Shot 2017-05-11 at 9.52.49 AM

And inside my process group, I have:

Screen Shot 2017-05-11 at 9.52.59 AM.png

Now, my process group having the ID “75973b6e-2d38-1cf3-ffff-fffffdea8cbc”, I can define the following Ambari reporting task:

Screen Shot 2017-05-11 at 9.54.50 AM.png

Note – you must keep “nifi” as the Application ID as it has to match the configuration of the Ambari Metrics System.

Once your reporting task is running, in Grafana, you can create your own dashboard for this workflow and display the metrics you want:

Screen Shot 2017-05-11 at 10.08.59 AM.png

For my Kafka example, here is the dashboard I defined:

Screen Shot 2017-05-11 at 10.39.47 AM.png

In this example, I can see that my workflow is running fine but the free disk space on one of my node is decreasing very quickly. It turns out that when my disk is completely filled, back pressure will be enabled in my workflow and there is no more data sent to Kafka. Instead data is queued in NiFi.

This simple example gives me a lot of information:

  • Everything is default configuration in Ambari and I chose my three NiFi nodes to also host Kafka brokers. By default, for Kafka, the replication factor is set to 1, the number of partitions is set to 1 and the automatic creation of topic is allowed (that’s why I didn’t need to create the topic before starting my workflow). Because of the default parameters, all of the data is sent to only one Kafka broker (pvillard-hdf-2) and that’s why the disk space is quickly decreasing on this node since my three NiFi nodes are sending data to this broker.
  • Also, we clearly see that’s not a good idea to collocate NiFi and Kafka on the same nodes since they are both IO intensive. In this case, they are using the same disk… and we can see that the task duration (for NiFi) is clearly higher on the Kafka node that is receiving the data (pvillard-hdf-2). Long story short: keep NiFi and Kafka on separated nodes (or at the very least with different disks).

With HDF and the Ambari Metrics System, it gives you the ability to create custom relevant dashboards for specific use cases. It also allows you to mix information from Kafka, from NiFi and from the hosts to have all the needed information in one single place.

Also, by using the REST API of the Metrics Collector (you may be interested by this article), you could also send your own data (not only the data gathered at the process group level) to add more information into your dashboards. An example that comes in mind would be to send the lineage duration (see Monitoring of Workflow SLA) at the end of the workflow using an InvokeHTTP processor and sending a JSON payload using a POST request to the API endpoint.

Let’s say I want to monitor how long it takes between my GenerateFlowFile and the end of my workflow to check if some particular events are taking longer. Then I could have something like:

Screen Shot 2017-05-11 at 5.58.02 PM.png

What am I doing here? I want to send to AMS the information about the lineage duration of the flow files I sent into my Kafka topic. However I don’t want to send the duration of every single event (that’s not really useful and it’s going to generate a lot of requests/data). Instead I want to make an API call only once per minute. The idea is to compute the mean and max of the lineage duration with a rolling window of one minute and to only send this value to AMS.

I could use the new AttributeRollingWindow processor but it is not as fast as the PublishKafka and I don’t want to generate back pressure in my relationships. So I use the InvokeScriptedProcessor to build my own rolling processor (it’s faster because I am not using any state information):

  • this processor takes a frequency duration as a parameter (that I’ll set to 1 minute in this example)
  • for every flow file coming in, it will extract the lineage start date to compute max and mean lineage duration over the rolling window. If the last flow file sent in the success relationship was less than one minute ago, I’ll route the flow file to drop relationship (that I set to auto-terminated). If it was more than one minute ago, I update the attributes of the current flow file with the mean and max of all the flow files since the last “success” flow file and route this flow file in the success relationship

Since I’ve flow files coming in my processor at a high rate, I know that my processor will release one flow file every minute with the mean and max of the linage duration for the flow files of the last minute.

Then I use a ReplaceText processor to construct the JSON payload that I’ll send to the Metrics Collector using the InvokeHttp processor.

Here is the configuration of the InvokeScriptedProcessor:

Screen Shot 2017-05-11 at 7.54.00 PM

The Groovy script used can be found here.

Then I create the JSON payload with the ReplaceText processor:

Screen Shot 2017-05-11 at 7.56.33 PM.png

Note that I use the ID of the processor as the “instanceid” attribute in the JSON.

Then, I use the InvokeHttp processor (with a scheduling/frequency of 1 minute):

Screen Shot 2017-05-11 at 7.57.45 PM.png

Now, I can use this information to build the corresponding graph in my Grafana dashboard:

Screen Shot 2017-05-11 at 7.59.27 PM

I can see that, in average, it takes about 150 milliseconds to generate my flow file, publish it in my Kafka topic and get it into my scripted processor. I could also generate one metric per host of my cluster to check if a node is performing badly compared to the others.

Now you can easily send your custom data into AMS and create dashboards for your specific use cases and workflows.

As usual feel free to ask questions and comment this post.