Discussion around Ranger policies for HDFS

OK… so… Apache Ranger is the piece you want to use to define authorizations in your cluster (and, most importantly, get all the audits coming with all the policies you define). Not only for HDFS but for all the components you’re using. In this post I just want to discuss about policies to secure how HDFS is used.

More specifically, here is my scenario:

  • I have a project called ‘myProject’ consisting of a Spark job accessing and processing sensitive data sitting in HDFS and the result of this processing is written back into HDFS as well. This data is so sensitive that users should not be able to access it unless specific permissions are granted. The job is running with the user ‘myProjectUser’.

Now… let’s say the team working on the project does not have permissions to access the production data and has mock datasets for development and staging environments. OK great, that’s usual situation. BUT… we have a rogue user, John Doe, in the development team and, somehow, he is able to introduce some lines of code so that the Spark job is writing the result in two places: the expected location where results should be written, and another unexpected place where John Doe has access on the production cluster. How can you prevent that?

Well… the immediate answer is a proper development process with pull requests and peer reviewing on any commit pushed to the code repository you’re using. But… let’s say that’s not enough, or, that two rogue users agreed to commit that code through a pull request. How can you set the Ranger policies to prevent this specific scenario?

Note – for this post, when I’m saying a folder has hdfs:hadoop:755, it means that chmod 755 and chown hdfs:hadoop have been set on that folder.

Let’s talk quickly about how Ranger works and, more specifically, how it works with HDFS. By default, Ranger allows you to define “allow” policies. Meaning that you grant specific access (read, write, execute) to a specific resource (file, folder) in HDFS. But something you need to recall is that, by default, HDFS ACLs will apply in combination with Ranger policies (both are used to check whether a user can access a resource). It means that, if you have:

  • a folder /myFolder with hdfs:hadoop:755 in HDFS
  • an “allow” policy saying that people belonging to group “sales” have RWX access to /myFolder

In reality, this “allow” policy does not prevent any access from people not belonging to the “sales” group. Since chmod is 755 anyone having access to HDFS can read the data inside that folder.

However, if you set chmod 000 to that folder, then, only the “allow” policy will apply and only people belonging to the “sales” group will have access to the folder.

Note – that’s not entirely true since there is one exception: the HDFS superusers you defined. Any HDFS superuser can access any folder. When sending a request to the Name Node (NN), the NN will check the identity of the requester and if it’s a superuser, it’ll not go through the authorization parts of the code. If you want to prevent HDFS superusers to access some data, then you probably want to look at Ranger KMS that will help ensuring segregation of duties by encrypting the data in HDFS using encrypting keys only available to specific users.

Note – it’s possible to prevent Ranger fallbacking on HDFS ACLs by setting
xasecure.add-hadoop-authorization=false
but it means you have to set Ranger policies for everything. That’s doable but can require some efforts to do it properly.

Now that you’re aware of that behavior, you probably want to look at this post with some best practices about Ranger and HDFS. The first thing I’d recommend is to change the default umask from 022 to 077 to ensure that, by default, a newly created file/directory can only be read by the owner unless a Ranger policy states otherwise.

Then, you want to take care of some specific folders in HDFS such as the ones under /apps (folders used by Hive, HBase, Zeppelin, etc). The idea is to set chmod 000 on the folders so that HDFS ACLs do not apply and only Ranger policies are enforced. Let’s say you have /apps/hive/warehouse where all your Hive databases are stored. If you set chmod 000 on /apps/hive, then no one can go inside a subfolder of /apps/hive unless there is an “allow” policy in Ranger for that user.

OK, that’s great… but let’s go back to our scenario where we have a rogue user… Our John Doe can SSH to an edge node on the production cluster where he has his home directory on HDFS (/user/jdoe with jdoe:jdoe:700). We could say that we are safe: the job running as myProjectUser does not have permissions to write in /user/jdoe. But there is nothing preventing jdoe to change the permissions on his home directory with a chmod 777 (however, only an HDFS superuser can execute a chown command to change ownership on a directory). Once John Doe changed the permissions, the job can write into that folder and then John can do whatever he wants with the data… Even though you’re defining “allow” policies in Ranger.

Let’s have a look at the folders potentially exposing this issue. We have:

  • /user
  • /tmp
  • /app-logs
  • /mr-history
  • /spark-history
  • /spark2-history

The above folders (except /user) have, by default, a chmod 777 technically allowing anyone to create a folder there and use that folder for an unexpected purpose. Let’s have a look and let’s try to ensure no one can create something we don’t want.

Example – /spark-history & /spark2-history (part 1)

That’s where the Spark History Server(s) are going to store data about ongoing Spark jobs, and completed jobs based on retention policies you defined. If we look at the structure:

/spark-history is spark:hadoop:777

and then we have folders such as:

/spark-history/<folder> with <user running the job>:hadoop:770

Folders are complying with a naming convention but we don’t really care here.

The objective is to get rid of the chmod 777 at the top level BUT… the issue here is that a user running a job needs the right to create a directory. So, basically, anyone needs the right to create a directory in /spark-history. I can’t just set chmod 000 and define “allow” policies… And that’s where Ranger introduced a very nice feature: the “deny” policies.

The “deny” policies in Ranger

If you want all the details, have a look here and here. The main idea is to add the option, when defining a policy, to:

  • exclude groups or users from the allow policy
  • deny access to users/groups on the resource
  • exclude groups or users from the deny policy

Then, here is how the policy is evaluated in the authorizer:

Ranger-Policy-Evaluation-Flow.png

Basically, if there is no “deny” policy, the most permissive access between Ranger policies and HDFS ACLs will be used to grant access to the resource. In other words, unless you specifically defined a “deny” policy that applies for the accessed resource, the HDFS ACLs will always be considered for authorization.

Enabling the “deny” policies

By default, the “deny” policies are not available in the Ranger UI. It’s mainly because the concept can be hard to understand and things can quickly become a mess when using the “deny” policies. Also, for most of the users, this feature will never be used. One needs to be careful and rigorous when using that feature.

To enable it, retrieve the HDFS service definition from the Ranger Admin server:

$ curl -k -u <user> https://<ranger_admin_server>:6182/service/public/v2/api/servicedef/1 > hdfs.json

Then, have a look at the content:

$ cat hdfs.json
{...,"isEnabled":true,"name":"hdfs","options":{"enableDenyAndExceptionsInPolicies":"false"}...}

In the “options” field, you can add (or modify it if it’s already there):

"options":{"enableDenyAndExceptionsInPolicies":"true"}

Then you just have to update the service definition:

$ curl -k -u <user> -X PUT -H "Accept: application/json" -H "Content-Type: application/json" -d @hdfs.json https://<ranger_admin_server>:6182/service/public/v2/api/servicedef/1

Variables in Ranger

In addition to the “deny” policies (introduced in Ranger 0.6), we’re going to use the variables (introduced in Ranger 0.7). We can use:

  • {OWNER} – owner of the resource
  • {USER} – user accessing the resource

It’s a very convenient way to define policies without specifically specifying users.

Example – /spark-history & /spark2-history (part 2)

We now have the tools to secure our /spark-history server by creating the following rules:

  • on /spark-history
    • Allow policy for group “public”, RWX permissions, non-recursive. This rule allows us to set chmod 000 in HDFS to keep things clean.

policy_spark_folder.png

  • on /spark-history/*
    • Allow policy for group “hadoop” and user “{OWNER}”, RWX permissions.
    • Deny policy for group “public”, RWX permissions. For any resource in /spark-history, no one has access.
    • Exclude deny policy for group “hadoop” and user “{OWNER}”, RWX permissions. For any resource in /spark-history, only users of the “hadoop” group (such as spark), and owner of the resources have RWX permissions.

policy_spark_content.png

Note – “public” group is a convention meaning anyone.
Note – same approach applies for /spark2-history

With the policies we defined, anyone can create a folder, but only the owner of that folder can access the data inside, or write data inside. We have now secured our /spark-history and /spark2-history folders.

Example – /mr-history

Let’s have a look at the folders structure for /mr-history

/mr-history with mapred:hadoop:777

/mr-history/done with mapred:hadoop:777
/mr-history/tmp with mapred:hadoop:777

/mr-history/tmp/<user> with <user>:hadoop:770

/mr-history/done/<year> with mapred:hadoop:770
/mr-history/done/<year>/<month> with mapred:hadoop:770
/mr-history/done/<year>/<month>/<day> with mapred:hadoop:770
/mr-history/done/<year>/<month>/<day>/<folderID> with mapred:hadoop:770

And in that last folder, we only have files (usually an XML file and a JHIST file for each job) with <user running the job>:hadoop:770.

When a job is launched by myUser, a folder (if not already existing) named myUser is created in /mr-history/tmp and this folder will be used to store metadata files about the running job. Once the job is completed, the user mapred will move the file from that folder into the corresponding /mr-history/done subfolder.

In conclusion, we can set chmod 000 on /mr-history and define the following rules:

  • on /mr-history/done
    • grant RWX permissions to group “hadoop” and user “activity_analyzer” (if you’re using SmartSense to provide statistics on your cluster)
  • on /mr-history/tmp, non-recursive
    • grant RWX permissions to group “public” so that anyone can create a folder
  • on /mr-history/tmp/*
    • grant RWX permissions to group “hadoop”, users “activity_analyzer” and “{OWNER}”
    • deny RWX permissions to group “public”
    • exclude deny RWX permissions to group “hadoop”, users “activity_analyzer” and “{OWNER}”

Note – it’s quite similar to the /spark-history approach.

Example – /user

For that specific folder, we know that no one except HDFS superusers should be able to create a folder in /user, and every folder in /user should be something like /user/<user> with permissions <user>:hdfs:700 (just like usual home directories).

Then, we just have to set chmod 000 on the /user folder and add an “allow” policy:

  • on /user/{USER}
    • allow policy granting RWX permissions to user “{USER}”

But we also have a particular situation to manage: the share lib folder of Oozie, by default, is located at /user/oozie/share/lib. We need to allow RX access to that folder to anyone launching Oozie jobs:

  • on /user/oozie/share/lib
    • allow policy granting RX access to group “public”
    • allow policy granting RWX access to group “hadoop”

Since /user is set with chmod 000 we are sure no one can go inside that folder unless an allow policy is created.

Example – /app-logs

Structure of /app-logs is very similar to what we’ve seen so far:

/app-logs with yarn:hadoop:777
/app-logs/<user> with <user>:hadoop:770

We’re adding the following rules:

  • on /app-logs, non-recursive
    • grant RWX permissions to group “public” so that anyone can create a folder (when a user is launching a job for the first time on the cluster, the user needs permission to create the folder)
  • on /app-logs/*
    • grant RWX permissions to group “hadoop” and user “{OWNER}”
    • deny RWX permissions to group “public”
    • exclude deny RWX permissions to group “hadoop” and user “{OWNER}”

We can now set chmod 000 on /app-logs to secure that folder.

Example – /tmp

The /tmp folder is similar to what we did so far, but with an exception: there is a /tmp/hive folder used by Hive to store temporary data when Hive queries are executed. Because of HIVE-18287, this folder needs to have chmod 733 or above (at HDFS ACLs level).

In /tmp/hive, we have folders like /tmp/hive/<user> with <user>:hdfs:700.

To summarize, we need to:

  • Allow users creating folders in /tmp
  • Keep chmod 733 on /tmp/hive
  • Allow users creating folders in /tmp/hive/
  • Only allow owners to modify data in /tmp/*

As you can see, it’s going to be difficult to manage because of the embedded folder used by Hive. The best approach is to change the Hive’s configuration to change the location of the scratch directory. What I suggest is:

hive.exec.scratchdir=/apps/hive/tmp

This way you can easily define policies on that particular folder by following the same approach we did so far on the other folders. Once this is done, it’s also easy to have the same approach to secure the /tmp folder:

  • on /apps/hive/tmp, non-recursive
    • grant RWX permissions to group “public” so that anyone can create a folder (when a user is launching a job for the first time on the cluster, the user needs permission to create the folder)
  • on /apps/hive/tmp/*
    • grant RWX permissions to group “hadoop” and user “{OWNER}”
    • deny RWX permissions to group “public”
    • exclude deny RWX permissions to group “hadoop” and user “{OWNER}”
  • on /tmp, non-recursive
    • grant RWX permissions to group “public” so that anyone can create a folder
  • on /tmp/*
    • grant RWX permissions to group “hadoop” and user “{OWNER}”
    • deny RWX permissions to group “public”
    • exclude deny RWX permissions to group “hadoop” and user “{OWNER}”

Note – if you are using Spark/Spark2, don’t forget that it relies on Hive’s configuration files. In case you’re using an Ambari managed Hortonworks cluster, you would have to add custom properties in “Custom spark-hive-site-override” for both Spark and Spark2 services.

Note – if you are using Oozie and have workflows using Spark actions, then you’d have to leverage the action configuration feature in Oozie. You can have a look here. In the case of the Hortonworks distribution, by default, you have

oozie.service.HadoopAccessorService.action.configurations=*=action-conf

And you have the following files/folders:

/etc/oozie/conf/action-conf/hive.xml
/etc/oozie/conf/action-conf/hive/hive-site.xml
/etc/oozie/conf/action-conf/hive/tez-site.xml
/etc/oozie/conf/action-conf/hive/atlas-application.properties

The hive-site.xml is automatically copied from /etc/hive/conf folder when Oozie server is restarted. But you need to do a change to have this data loaded for Spark actions. You can create spark and spark2 folders and create a symbolic link to the hive-site.xml file.

Conclusion

Objective of this post was to give a quick overview of how Ranger is working for HDFS and what you need to consider if you want to secure your cluster. As you saw in this post, if you also want to prevent rogue users accessing data by changing the behavior of an application, you need to set few rules in order to secure the folders having a default chmod 777 at the HDFS ACLs level.

This post is certainly not exhaustive but should give you an idea of what you can do. As usual, feel free to comment / ask questions, and thanks for reading thus far.

Hue/Oozie causing CPU overload

Quick post about an issue I faced today on one of the clusters: I received an alert about abnormal high CPU use on one of the master nodes. A quick htop gave me the culprit: the Oozie server hosted on this node.

I looked at the logs and didn’t see anything unusual in the oozie.log file. But by looking at the oozie-audit.log file, I noticed a very large number of requests being issued by Hue and proxifying users:

# sed 's/.* DoAs user \[\(.*\)\] Request .*/\1/g' oozie-audit.log | sort | uniq -c | sort -nr | head
279616               jdoe
27902                zoaks
16018                mparisien
14025                gkass
12211                lzastrow
9730                 sleaf
7460                 sladwig
6048                 vespinoza
5815                 lkonen
2862                 lrayburn

It appeared my John Doe was issuing more than 5 requests per second to the Oozie server using Hue causing the high CPU consumption.

When being on the Oozie dashboards in Hue with your browser, there is an auto-refresh feature issuing requests to the Oozie server every 5 seconds to get the latest statuses. Problem is if a user is opening multiple tabs in the browser, it can lead to a lot of requests. Now… if the user forgets to close the browser and remains connected, you have a nice DDoS-like situation.

By looking at the Hue documentation, I thought I found a solution with the below:

[desktop]
[[auth]]
# Users will automatically be logged out after ‘n’ seconds of inactivity.
# A negative number means that idle sessions will not be timed out.
idle_session_timeout=-1

I tried setting this value to 600 seconds (10 minutes) to get inactive people automatically logged out. It works fine when you’re staying on a static page in Hue but not if you’re staying on the Oozie pages… the auto-refresh is keeping you “active” even though you’re not.

The only option I found is to use the ttl (time-to-live) parameter to define when the cookie will expire and force the user to authenticate again. The issue with this parameter is that it’ll log out the user even though the user is active and actually using Hue.

To avoid any unpleasant user experience, you can set this parameter to something like 28800 (8 hours):

[desktop]
[[auth]]
ttl=28800

It does not solve the original issue because you’ll keep your Oozie server receiving a lot of requests for 8 hours but, at least, you limit how long this situation can last.

The best solution, assuming you have installed multiple Oozie instances for high availability behind a load balancer, is to configure the LB to extract the user name from the requested URL (&doAs=<user>) and to throttle the number of requests issued by a single user. That will provide the best protection without impacting the user experience. Look at your LB’s documentation to configure such a solution.

Using the SmartSense Activity Explorer for cluster reporting

In the Hortonworks Data Platform, there is SmartSense, a service that analyzes cluster diagnostic data, identifies potential issues, and recommends specific solutions and actions.

SmartSense is made of multiple components and one of the component is the Activity Explorer which is a customized Zeppelin notebook used to access and display the data collected by the Activity Analyzer instances stored in an HBase instance and accessed using Phoenix.

The Activity Explorer gives access to a lot of very useful data when administrating a cluster. For an exhaustive list, have a look to the documentation here.

By default, this Activity Explorer / Zeppelin is configured with the Phoenix interpreter only. The idea of this post is to describe how we can add the JDBC interpreter (or any other interpreter) to allow administration teams using this specific Zeppelin instance as a more general tool for cluster reporting.

One might wonder why I’m not using the Zeppelin service available in the HDP stack. The reason is quite simple: usually, the Zeppelin instances would be deployed on the edge nodes (to be used by the project teams / users of the cluster) while the Activity Explorer would be deployed on an administration node and only accessed by the administrators of the cluster. The idea is to keep Zeppelin instances separated based on the purpose.

First step is to package the JDBC interpreter. Go on the node where you installed the Zeppelin service (where all the interpreters are installed) – not the node where you installed the Activity Explorer component.

cd /usr/hdp/current/zeppelin-server/interpreter/
zip -r jdbc.zip jdbc/

And deploy this ZIP file on the node where is installed the Activity Explorer:

cd /usr/hdp/share/hst/activity-explorer/interpreter/
unzip jdbc.zip

Restart the Activity Explorer component so that the interpreter is available for configuration.

Go to the interpreter configuration page and add a new one, selecting the JDBC type. Configure the interpreter as needed based on your cluster (you can check the configuration you set for this interpreter in the Zeppelin service). In particular, you’ll need:

zeppelin.jdbc.auth.type=KERBEROS
zeppelin.jdbc.principal=<principal of the activity explorer>
zeppelin.jdbc.keytab.location=<keytab of the activity explorer>
hive.proxy.user.property=hive.server2.proxy.user

Note: do not use _HOST in the principal name, use the host FQDN instead.

I also strongly recommend you to configure SSL on the Activity Explorer as well as configuring proper authentication/authorization mechanisms. You can do all that through Ambari as you’d do for the Zeppelin service (have a look at the documentation here).

Since the Activity Explorer account is going to proxy your requests to Hive through the JDBC interpreter, you need to add the proper proxy rules:

hadoop.proxyuser.activity_explorer.groups=<administrator group>
hadoop.proxyuser.activity_explorer.hosts=<activity explorer host>

And you’ll have to restart the appropriate services.

If you stop here and restart the Activity Explorer component, you’ll loose your JDBC interpreter configuration because all of the interpreter configuration of the Activity Explorer is managed by Ambari and reset at each component restart. To prevent the loss of your configuration, you need to copy the content of the file:

/etc/smartsense-activity/conf/interpreter.json

(content of this file has been updated by the Activity Explorer after you added the JDBC interpreter)

And paste this content in Ambari / SmartSense / Advanced / Advanced activity-zeppelin-interpreter. This way, your configuration will remain the same.

Note: keep in mind that all this procedure might have to be done again after a SmartSense upgrade since it’s not the default deployment.

You’re now all set! If you’re wondering what can be done with the JDBC interpreter to enhance the cluster administration tasks… the first thing I can recommend is to create Hive tables on top of the Ranger audits stored in HDFS so that you can create long term reports based on all the cluster audits (if you’re using Solr for the Ranger audits, this data is only stored for a short period of time, default is 90 days). Creating Hive tables on top of the data sitting in HDFS can really be useful if you have compliance/security teams looking for audits reporting.

You could also use the JDBC interpreter to directly access the data in the database backend used for some services like Ambari, Ranger, Hive, etc. It can provide interesting data to build useful reports.

As always, thanks for reading, and feel free to ask questions / leave a comment.

NiFi 1.7+ – Terminate threads

One of the new features coming with NiFi 1.7.0 is the possibility to terminate stuck threads directly from the UI. Before this release, when you had a processor getting stuck (like a custom processor with a deadlock) you had no option but to restart NiFi… and that’s not really great in a multi-tenant setup.

Let’s see this new feature with an example: I’m using a GenerateFlowFile and, using the debug mode of my IDE, I’m going to simulate an issue with the thread by adding a break point in the onTrigger() method of the processor.

Screen Shot 2018-07-02 at 2.56.57 PM.png

When the processor is running, we can see the number of threads currently used by the processor (top right). If the number of tasks is not increasing (and equal to 0 after 5 minutes) and the number of threads is constant, it probably means you have a stuck thread. In this case, it is always recommended to do a thread dump of NiFi to see what is going on. To do that:

./bin/nifi.sh dump /tmp/thread-dump.txt

If we look at the content of the generated file and looking for my GenerateFlowFile processor, we can see something like:

"Timer-Driven Process Thread-5" Id=57 RUNNABLE (suspended)
 at org.apache.nifi.processors.standard.GenerateFlowFile.onTrigger(GenerateFlowFile.java:210)
 at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
 at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1165)
 at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:203)
 at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
 at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
 at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
 at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745)
 Number of Locked Synchronizers: 1
 - java.util.concurrent.ThreadPoolExecutor$Worker@2388307

In this case, the thread is shown as “suspended” at the line where I put my break point in my IDE.

Note that having a stuck thread is usually a symptom of a badly designed processor or an underlying issue with the code dependencies of the processor. Having the thread dump can help locating and fixing the issue.

If trying to stop the processor:

Screen Shot 2018-07-02 at 3.02.23 PM.png

I’ll now see two threads being used by the processor:

Screen Shot 2018-07-02 at 3.03.09 PM

The processor is now in the process of being stopped but you won’t be able to update or restart the processor until it is actually stopped. However, if the initial thread is stuck, you’ll remain in this state forever (and, eventually, you’ll have to restart NiFi if you’re running a version below 1.7.0).

With NiFi 1.7.0, you now have the possibility to terminate the thread (meaning you don’t have to wait for the thread to be completely stopped):

Screen Shot 2018-07-02 at 3.06.58 PM

If I terminate the thread, here is what I’ll see:

Screen Shot 2018-07-02 at 3.08.07 PM

Meaning there is no more active thread and there is one thread being terminated.

Under the hood, the framework is issuing an “interrupt” for the thread and will perform a “reload” of the processor meaning there is a new instance of the processor class being created. This allows the old class to eventually shut down gracefully, close connections, etc. However this needs to be used with care: if the class is maintaining some values in internal variables, this information will be lost in the process (I’m not talking about state information that can be saved at framework level by the processors).

An interrupt is an indication to a thread that it should stop what it is doing and do something else. It’s up to the programmer to decide exactly how a thread responds to an interrupt, but it is very common for the thread to terminate. However if, for some reasons, the thread does not respond to interrupt, then you will keep the thread as being terminated forever (in my above example, I can see the thread staying in the state of being terminated because of my break point).

You can now update the configuration and start again the processor (even if a thread is being terminated). Once the thread is terminated, you’ll get back to a nominal situation:

Screen Shot 2018-07-02 at 4.54.51 PM.png

If you don’t want to setup a debug environment, you can simulate a stuck thread with the following ExecuteScript processor:

Screen Shot 2018-07-02 at 4.56.49 PM.png

If starting/stopping the processor:

Screen Shot 2018-07-02 at 6.02.37 PM.png

And I’ll get the following message when terminating the thread:

Screen Shot 2018-07-02 at 6.02.58 PM.png

You also can use the DebugFlow processor with “@OnStopped Pause Time” set to something like “5 min” and “Ignore Interrupts When Paused” set to “true”:

Screen Shot 2018-07-02 at 6.49.29 PM.png

That’s it for this post! This new feature is really nice in a multi-tenant environment where you can’t afford a restart of the service. If you’re in a situation where you need to use this feature, remember to take a thread dump before actually terminating the thread. This will be really useful to investigate the issue. Also, remember that terminating a thread is not a “normal” operation, it means there is something wrong somewhere and it could very likely happen again.

Thanks for reading this post and, as always, feel free to comment / ask questions!

NiFi 1.7+ – XML Reader/Writer and ForkRecord processor

Starting with NiFi 1.7.0 and thanks to the work done by Johannes Peter on NIFI-4185 and NIFI-5113, it’s now possible to use an XML reader and writer in the Record processors to help you processing XML data. Before that, you had few options requiring a bit of additional work to get things working (see here).

I won’t go into the details because the reader/writer are really well documented (have a look at the additional details for examples):

Also, with NIFI-4227, a ForkRecord processor is now available to “explode” your data when you have one or multiple embedded arrays in your data set and you need to normalize your data. Have a look at the ForkRecord documentation (additional details) for examples.

Example – Introduction

Let’s go through an example using the XML Reader and the ForkRecord processor. I assume I’m receiving XML data with the following schema:

Screen Shot 2018-06-27 at 10.39.20 AM.png

And here is a dummy file I’m receiving that I’ll use for this example:

Screen Shot 2018-06-27 at 2.15.28 PM.png

The corresponding Avro schema can be found here. Defining the Avro schema corresponding to your data is the most “difficult” part but once it’s done, everything else is really easy with the Record processors. You can get some help using the InferAvroSchema processor but this should only be used to get an initial version of your Avro schema when you start developing your workflows (it should not be used as a processor used in your production workflows).

I’m going to quickly explain the following workflow (template available here):

Screen Shot 2018-06-27 at 2.24.43 PM.png

Example – XML to JSON conversion

The GenerateFlowFile is used to generate my XML data and to send the content to the Record processors I’m using. The upper part of the workflow is just a ConvertRecord processor to perform the XML to JSON conversion thanks to the schema. The other parts of the workflows are used to extract each “object” of my schema (customer, address, account and transaction).

Here are the configuration details for the XML to JSON conversion. The GenerateFlowFile is very simple:

Screen Shot 2018-06-28 at 2.25.23 PM

Note the attribute ‘schema.name’ set with the value of the schema I added in my Avro schema registry controller service:

Screen Shot 2018-06-28 at 2.26.24 PM.png

The XML Reader is configured to get the schema name from the ‘schema.name’ attribute of the processed flow files:

Screen Shot 2018-06-28 at 2.27.21 PM.png

The JSON writer does not contain any specific configuration, it’ll write the data using the schema inherited at the reader level:

Screen Shot 2018-06-28 at 2.28.22 PM

The ConvertRecord processor can now be configured for the XML to JSON conversion:

Screen Shot 2018-06-28 at 2.29.20 PM.png

That’s it! You now have a very easy and powerful way to perform your XML to JSON conversion. Here the flow file content before the processor:

Screen Shot 2018-06-27 at 2.15.28 PM

Here is the JSON generated by the processor:

Screen Shot 2018-06-28 at 2.31.23 PM

If you need to manipulate your records to go from one schema to another, you can use the UpdateRecord processor.

Note – using the XML Reader/Writer and Record processors, you can expect performances as good as the best options presented in my previous post about XML data processing.

Example – ForkRecord processor

The goal here is to extract the data contained in the arrays into separated CSV files. From one XML file, I want to generate 4 CSV files that I could use, for instance, to send data into database tables.

Let’s start with the first one regarding customer data. I’m adding a ‘customer’ Avro schema in the Avro schema registry:

Screen Shot 2018-06-28 at 2.42.58 PM.png

I’m using an UpdateAttribute, to set the name of the ‘output.schema’ that I want to be used by the CSV Record Writer. Then, I’m just using a ConvertRecord processor to process from XML to JSON and I only keep the root level fields (customer data). Here is the configuration of my CSV writer:

Screen Shot 2018-06-28 at 2.45.23 PM.png

Here is the output when processing my XML data:

Screen Shot 2018-06-28 at 2.46.32 PM.png

Now, I want to extract data contained in arrays and that’s where the ForkRecord is useful. To use it, I need to specify a custom property containing the Record path to the array that the processor should process, then you have two modes for the processor:

  • Split – that will preserve the input schema but will create one flow file per element contained in the array.
  • Extract – that will generate flow files using the element of the arrays with the possibility to include the parent fields up to the root level in case you want to keep some fields as keys/identifiers of your arrays elements. Note that you can define which parent fields you want to preserve by setting the appropriate schema with only the fields you want.

In our case we’re using the Extract mode and we are including the parent fields: for the ‘address’ data for instance, we want to keep the customer_id field in the output (that field would be used as a key between the customer data and the address data in the database world). Here is the schema I define in my Avro schema registry for the ‘address’ object:

Screen Shot 2018-06-28 at 2.52.03 PM

I’m using an UpdateAttribute to set the attributes I’m using in my reader and the ForkRecord processor:

Screen Shot 2018-06-28 at 2.55.04 PM

And I’m configuring the ForkRecord processor to use the ‘fork.path’ attribute defining the array of elements to be processed:

Screen Shot 2018-06-28 at 2.55.59 PM.png

Note – if you have multiple Record paths pointing to multiple arrays with elements using the same output schema, you can define as many custom properties as you want.

And here is the output after the ForkRecord processor:

Screen Shot 2018-06-28 at 2.56.40 PM

Here are the Record paths I’m setting for the other type of objects I want to extract:

  • Account – fork.path = /accounts/account
  • Transaction – fork.path = /accounts/account[*]/transactions/transaction
    In this case, I need to specify account[*] because I want to access all the transactions of all the elements in the account array. But I could use the Record path features to only access some specific accounts based on some predicates.

And here the output I get (after defining the appropriate schema in my schema registry):

  • Account data (I only keep the customer_id from the parent fields):

Screen Shot 2018-06-28 at 3.02.14 PM.png

  • Transaction data (I only keep the customer_id and account_id from the parent fields):

Screen Shot 2018-06-28 at 3.03.55 PM

Conclusion

The XML Reader & Writer are a really nice addition to NiFi if you need to process XML data and it will make your existing workflows much simpler! You really have to look at the Record processors and concepts as soon as you’ve a use case manipulating data complying to a schema.

The ForkRecord processor is a new processor that you can use if you need to manipulate schema oriented data containing a lot of arrays. That can be useful in case you need to normalize data before ingestion into databases.

Thanks for reading this post and, as always, feel free to comment and/or ask questions!

NiFi workflow monitoring – Wait/Notify pattern with split and merge

Thanks to NIFI-4262 and NIFI-5293, NiFi 1.7.0 contains a small improvement allowing users to extend the Wait/Notify pattern to merging situations.

If you’re not familiar with the Wait/Notify concept in NiFi, I strongly recommend you to read this great post from Koji about the Wait/Notify pattern (it’ll be much easier to understand this post).

When using the Merge* processors, you have one relationship to route the flow file containing the merged data and one relationship to route the original flow files used in the merging operation. With NIFI-4262 for the MergeContent processor (and NIFI-5293 for the MergeRecord processor), the original flow files used in the merge operation will now contain an attribute with the UUID of the flow file generated by the merge operation. This way, you now have a way to link things together and that’s really useful to leverage the Wait/Notify pattern.

Use case presentation

Let’s demonstrate the feature with a use case: I’m receiving ZIP files containing multiple CSV files that I want to merge together while converting the data into Avro and send it into a file system (for simplicity here, I’ll send the data to my local file system, but it could be HDFS for instance).

The idea is the following:

ListFile -> FetchFile -> UnpackContent -> MergeRecord -> PutFile

That’s easy and it works great… BUT… I want to be able to monitor and track exactly how each ZIP file is handled and when the data contained in a ZIP file is stored in the destination. I have a requirement to maintain in real-time the following tables:

Table ‘zip_files_processing’

  • zip_file_name – unique name of the received ZIP files
  • ff_uuid – flow file UUID (useful to replay events in case of errors)
  • ingestion_date – date when the zip file starts being processing in NiFi
  • storage_date – date when all the data of the ZIP file has been processed
  • number_files – number of CSV files contained in the ZIP file
  • number_files_OK – number of files successfully stored in the destination
  • number_files_KO – number of files unsuccessfully processed
  • status – current status of the process for the ZIP file (IN_PROGRESS, OK, KO)

Table ‘files_ingestion’

  • zip_file_name – unique name of the zip file containing the CSV file
  • file_name – name of the CSV file
  • ff_uuid – flow file UUID (useful to replay events in case of errors)
  • storage_date – date when the CSV file has been processed
  • storage_name – name of the destination file containing the data
  • status – current status of the process for the CSV file (IN_PROGRESS, OK, KO)

That’s where the Wait/Notify is going to be extremely useful.

High level description of the workflow

Here is the main idea behind the workflow:

Screen Shot 2018-06-13 at 4.14.17 PM.png

Let’s describe the workflow at a high level:

  • List the data available and insert rows in my zip_files_processing table for each listed file so that I can track ingested data
  • Fetch the data (in case of error, I update the zip_files_processing table to set the status to KO)
  • I unpack my ZIP files to get a flow file per CSV file, and I route the original flow file to a Wait processor. The flow file representing the ZIP file will be released only once all the associated CSV files will be fully processed
  • For each CSV file, I insert a row in my files_ingestion table to track the status of each CSV file being processed
  • I merge the data together while converting the data from CSV to Avro, and I route the original flow files to a Wait processor. The flow files will be released only once the merged flow file will be sent to the final destination
  • I send the merged flow file to the destination (local file system in this case) and then use the Notify processor to release all the original flow files used to create the merged flow file
  • Then I use the released flow files to notify the first Wait processor and also release the flow file representing the corresponding ZIP file
  • With all the released flow files, I can update my monitoring tables with the appropriate information

Workflow details

Here is the full configuration of the workflow and some explanations around the parameters I used. Also, here is the template of the workflow I used – it’ll be easier to understand if you have a look, give it a try and play around with it (even without the SQL processors) – note that you will need to add a Distributed Map Cache server controller service to have the Distributed Map Cache clients working with Wait/Notify processors:

Screen Shot 2018-06-26 at 1.50.35 PM.png

Let’s start describing the ingestion part:

Screen Shot 2018-06-26 at 1.52.57 PM.png

Nothing very strange here. Just note the UpdateAttribute processor I’m using to store the original UUID to a dedicated attribute so that I don’t loose it with the UnpackContent processor (that creates new flow files with new UUIDs).

Screen Shot 2018-06-26 at 1.56.13 PM

The first PutSQL is used to insert new lines in my ZIP monitoring table for each ZIP file I’m listing. I’m executing the below query:

Screen Shot 2018-06-26 at 1.55.31 PM

The second PutSQL is used in case of error when fetching the file from the local file system or when an error occurs while unpacking the archive. In such a case, I’m executing the below query:

Screen Shot 2018-06-26 at 1.57.24 PM.png

Let’s now focus on the second part of the workflow:

Screen Shot 2018-06-26 at 1.58.28 PM

The UnpackContent processor is used to extract the CSV files from the ZIP file. For one flow file representing the ZIP file, it will generate one flow file per CSV files contained in the ZIP file. The original flow file will then be routed in the original relationship while flow files for CSV data will be routed in the success relationship. The flow files containing the CSV data will have a ‘fragment.identifier’ attribute and that’s the common attribute between the original flow file and the generated ones that I’m going to use for Wait/Notify (this attribute is automatically generated by the processor). Besides the original flow file also has a ‘fragment.count’ attribute containing the number of generated flow files for the CSV data.

The Wait processor is configured as below:

Screen Shot 2018-06-26 at 2.03.46 PM

Basically I’m setting the identifier of the release signal to the common attribute shared between the original and the generated flow files, and I’m saying that I have to wait for ‘fragment.count’ signal before releasing the original flow file to the success relationship. Until then, the flow file is transferred to the ‘wait’ relationship which is looping back on the Wait processor. Also, I configured a 10 minutes timeout in case I didn’t received the expected signals, and in this case the flow file would be routed to ‘expired’ relationship.

After my Wait processor, in case of error, I’m using an UpdateAttribute to retain this error:

Screen Shot 2018-06-26 at 2.08.50 PM.png

And after that, I’m using the PutSQL processor to update the ZIP monitoring table by executing the below query:

Screen Shot 2018-06-26 at 2.09.46 PM.png

The ‘wait.*’ attributes are generated by the Wait processor when releasing the flow file and contain the number of signals sent by the Notify processor (I’ll come back to that when describing the corresponding Notify processor). In this case, it allows me to know how many files contained in the ZIP file have been successfully processed and how many have not been successfully processed.

After the UnpackContent and the success relationship, I’m using a PutSQL to insert lines in my CSV monitoring table by executing the below query:

Screen Shot 2018-06-26 at 2.19.41 PM

The ‘segment.original.filename’ is the name of the ZIP file from which the CSV files have been extracted.

Let’s move to the final part of the workflow (it’s a bit loaded because I wanted to have everything in my screenshot):

Screen Shot 2018-06-26 at 2.27.15 PM.png

The MergeRecord is used to merge the data together while converting from CSV to Avro:

Screen Shot 2018-06-26 at 2.28.11 PM

In a real world use case, you would configure the processor to merge much more data together… but for the below demonstration I’m merging few flow files together so that I can quickly see the results.

Also I’m generating dummy data like:

Screen Shot 2018-06-26 at 2.29.41 PM.png

And the corresponding schema I’m using in the Avro Schema Registry controller service is:

Screen Shot 2018-06-26 at 2.30.55 PM.png

After the MergeRecord processor, there is the ‘original’ relationship where are routed the flow files used during the merge operation, and there is the ‘merged’ relationship where are routed the flow files containing the merged data. As explained above, the original flow files are containing a ‘merge.uuid’ attribute with the UUID of the merged flow file. That’s the attribute I’m using in the Wait processor configuration:

Screen Shot 2018-06-26 at 2.35.49 PM

Basically, I’m creating a signal identifier in the distributed cache with the ‘merge.uuid’ processor and as long as the signal counter is greater than 1, I’m releasing one flow file at a time. You’ll see in my Notify configuration (coming back to that in a bit) that, once my merged data is processed, I’m using the Notify processor to set the signal counter associated to the identifier with a value equal to the number of merged flow files. This way, once there is the notification, all the corresponding flow files will be released one by one.

I won’t describe the PutFile processor which is used to store my data (it could be a completely different processor), but just notice the UpdatAttribute I’m using in case of failure:

Screen Shot 2018-06-26 at 2.40.47 PM.png

And the UpdateAttribute I’m using in case of success:

Screen Shot 2018-06-26 at 2.41.24 PM

Note that I’m setting attributes with the same prefix ‘wn_ingestion.*’. Then, look at my Notify processor:

Screen Shot 2018-06-26 at 3.13.05 PM.png

The ‘uuid’ attribute is equal to the ‘merge.uuid’ I mentioned above for the original flow files used in the merge operation. Besides, I’m using the Attribute Cache Regew property to copy the corresponding attributes to all the flow files going to be released thanks to this signal. It means that when a merged flow file is sent to the Notify processor, all the original flow files with the corresponding in the ‘merge.uuid’ in the ‘wait’ relationship will be released and will get the ‘wn*’ attributes of the merged flow file. That’s how I’m “forwarding” the information about when the merged flow file has been sent to the destination, if the merged flow file has been successfully processed or not, and what is the filename used to save the file in the destination.

Once the original flow files are released from the Wait processor, they are sent to the Notify processor with the below configuration:

Screen Shot 2018-06-26 at 3.18.24 PM.png

In this case, I’m using the attribute inherited from the merged flow file sent to the destination to determine if the processing has been successful or not. If yes, then I’m setting the counter name to ‘OK’, otherwise it’s set to ‘KO’. That’s how I’m getting the information about how many CSV files from a given ZIP have been successfully processed or not. (remember the ‘wait.counter.OK’ and ‘wait.counter.KO’ attributes created when the flow file corresponding to the ZIP file is released?).

And now… I can execute my SQL query to update my CSV monitoring table with the latest information:

Screen Shot 2018-06-26 at 3.22.09 PM

Demonstration

Let’s see how the workflow is working and what’s the result in all kind of situations. I’m running a Postgres instance in a Docker container to store my monitoring data. Here are the statements I used to create the two tables.

CREATE TABLE IF NOT EXISTS zip_files_processing (
 zip_file_name varchar(255),
 ff_uuid varchar(255),
 ingestion_date timestamp DEFAULT current_timestamp,
 storage_date timestamp,
 number_files integer,
 number_files_OK integer,
 number_files_KO integer,
 status varchar(15)
);
CREATE TABLE IF NOT EXISTS files_ingestion (
 zip_file_name varchar(255),
 file_name varchar(255),
 ff_uuid varchar(255),
 storage_date timestamp,
 storage_name varchar(255),
 status varchar(15)
);

Case 1 – nominal situation

In this case, all the received data is OK and all is working flawlessly… Let’s say I’m receiving two ZIP files. Once the data is received, I can see something like:

Screen Shot 2018-06-15 at 11.53.58 PM

 

Then the ZIP files are unpacked, and I can see my CSV files in the other table:

Screen Shot 2018-06-15 at 11.54.42 PM

Then the data is merged and sent to the destination. At the end, my tables looks like:

Screen Shot 2018-06-15 at 11.56.28 PM.png

I can see that the data coming from the two ZIP files has been merged into the same final file and everything is OK. In the ZIP monitoring table, I have:

Screen Shot 2018-06-15 at 11.57.55 PM

Case 2 – cannot fetch ZIP file

In this case, the FetchFile processor is failing due to some permission issues. Here is what I get in my ZIP monitoring table:

Screen Shot 2018-06-16 at 12.04.35 AM

Note that I could a ‘comment’ column with the error cause.

Case 3 – cannot unpack ZIP file

Result will be identical to case 2 as I’m handling the situation in the same way.

Case 4 – cannot merge records because a CSV file is not schema valid

I’m processing 2 ZIP files but a CSV file contained in one of the ZIP file cannot be merged with the others because it does not respect the schema. Note that all the flow files used during a merge operation will be sent to the failure relationship if one of the flow file is not valid. That’s why multiple flow files will be routed to the failure relationship even though there is only one bad file. To avoid this situation, it would be possible to add an additional step by using the ValidateRecord processor.

For my ZIP files, I have:

Screen Shot 2018-06-22 at 8.42.00 PM

At the end, for this run, I have:

 

Screen Shot 2018-06-22 at 8.44.01 PM.png

Case 5 – cannot send a merged file to destination

In this case I’m simulating an error when sending a merged flow file to the destination. Here is the result at the end for the ZIP table:

Screen Shot 2018-06-22 at 9.57.09 PM

And the CSV files table:

Screen Shot 2018-06-22 at 9.57.44 PM

Conclusion

I’ve covered a non exhaustive list of situations and the workflow could be improved but it gives you a good idea on how the Wait/Notify pattern can be used in a NiFi workflow to help monitoring what’s going on when dealing with split / merge situations. In addition to that, it’d easy to add an automatic mechanism to try replaying failed flow files using the UUID and the provenance repository replay feature of NiFi.

Thanks for reading this post and, as always, feel free to comment and/or ask questions!

Automate workflow deployment in Apache NiFi with the NiFi Registry

Apache NiFi 1.6 (release note) is now out and one of the great new features is the addition of a Command Line Interface in the NiFi Toolkit binary that allows you to interact with NiFi instances and NiFi Registry instances.

In this post, I’ll discuss about the ways you have to automate deployment and promotion of workflows between multiple environments using the NiFi Registry. I’ll discuss and demonstrate two options you have that you need to know about:

  • the NiFi CLI released with the NiFi Toolkit 1.6.0 (download) (github)
  • The NiPyAPI python library 0.8.0 (github)

Before going into the details, let’s discuss about the different scenarios we have… First, how many NiFi Registry instances do you have? According to your security requirements, it might not be allowed to have a single NiFi Registry available from all your environments. In that case, you could be in a situation where you need to have a NiFi Registry instance running for each of your NiFi environments. And, in such a case, you’d need to export/import the workflows information between the NiFi Registry instances to take benefit of all the great features you have with the Registry.

  • Scenario 1 – one Registry to rule them all

Screen Shot 2018-03-27 at 4.57.03 PM

  • Scenario 2 – one Registry per environment

Screen Shot 2018-03-27 at 5.02.27 PM

Then you have two kind of situations to handle:

  • Deployment of a completely new workflow from one environment to another
  • Update to a newer version of an existing workflow in production

I’ll go into the details of each of the 4 cases using each one of the two clients available for you to use. It’s up to you to decide what client you prefer to use. For each one of the cases I’ll only do one “hop” from Dev to Prod, but I’d expect you to have at least one additional hop to actually test your workflow in an integration environment.

Note 1 – I won’t use a secured environment in the following demos, but both of the tools are supporting secured deployments so that you can authenticate using certificates to perform the actions as a given user having the appropriate authorizations.

Note 2 – For this post, I choose to have the following workflow:

For the first version of the workflow I will have:

ListFile => FetchFile => PutHDFS

Then I’ll add a connection from FetchFile to also push the data into Elasticsearch by adding a PutElasticSearchHttp processor. The idea is also to show you how to handle variables that need to be changed according to the environment.

Before starting anything… How to configure your NiFi instance/cluster to exchange with the NiFi Registry? To do that, you need to go in the top right menu, to go in Controller Settings, and in the Registry Clients tab:

NiFi Menu

Configure Registry Client

List of registry clients

In scenario 1, both the NiFi dev and the NiFi prod are configured to communicate with the same NiFi Registry instance. In scenario 2, each environment is configured with its own NiFi Registry instance.

Important – even if case 1 is not the case you are interested in, that’s where I’m describing the initial workflow and what are the differences between versions 1 and 2. Please have a look as I won’t go into as much details when describing the other cases.


  • Case 1 – One NiFi Registry using the NiFi CLI

Environment details:
NiFi Devhttp://nifi-dev:8080/nifi
NiFi Prodhttp://nifi-prod:8080/nifi
NiFi Registryhttp://nifi-registry:18080/nifi-registry

First of all, I create a process group that will contain my workflow. I call this process group “My Project”.

Process Group

Then, I design my workflow:

Screen Shot 2018-03-30 at 1.10.57 PM

I have externalized some properties because I know the value will change depending on which environment my workflow is running. To use process group variables in a property, you first need to check that Expression Language is supported. Besides, you cannot use variables for sensitive properties (I’ll come back to this point at the end of this article). To check if expression language is support on a property, you can hover the question mark next to it:

Screen Shot 2018-03-30 at 1.12.58 PM

To add variables for a process group, you need to right click in the canvas (when inside the process group) and click on “variables”, or directly right click on the process group and click on “variables”:

Screen Shot 2018-03-30 at 1.14.57 PM

Here I define my variables that I will use in the configuration of the processors belonging to my workflow:

Screen Shot 2018-03-30 at 1.15.57 PM.png

This view allows you to define key/value pairs that can be referenced in components using the expression language. You can also view the list of the components referencing the variables you create.

Here is the configuration of my ListFile processor:

Screen Shot 2018-03-30 at 1.18.14 PM

Here is the configuration of my FetchFile processor:

Screen Shot 2018-03-30 at 1.18.22 PM.png

Here is the configuration of my PutHDFS processor:

Screen Shot 2018-03-30 at 1.18.32 PM.png

As you can see I’m using external variables for:

– the input directory used in ListFile
– the list of configuration files used in PutHDFS
– the Kerberos principal used in PutHDFS
– the Kerberos keytab used in PutHDFS
– the output directory used in PutHDFS

Next step is to create a bucket for my project in the NiFi Registry. To do that, go to the NiFi Registry home page, and go in Settings (top right):

Screen Shot 2018-03-30 at 1.32.10 PM.png

I create a bucket for the project I’m working on:

Screen Shot 2018-03-30 at 1.33.00 PM.png

A bucket is a logical place to store versioned items/resources and that’s on buckets that permissions/authorizations are assigned to users/groups. Currently the only resource type to store in buckets are versioned flows (or workflows). Each versioned flow has a name, a description, and 1 or more “snapshots” (versions). Each snapshot (or version) has metadata and content: the metadata contains a version number, a commit message, an author, and a commit date ; the content contains the representation of the workflow itself when it has been committed.

Once my bucket is created, I can start versioning my workflow in the registry. To do that, right click on the process group and start version control:

Screen Shot 2018-03-30 at 1.40.39 PM.png

Choose the bucket in which you want to register you workflow:

Screen Shot 2018-03-30 at 1.42.21 PM

Once your workflow is version controlled in the Registry, you should be able to see it in the Registry:

Screen Shot 2018-03-30 at 1.43.45 PM.png

Also, in NiFi, you can see that your process group is now versioned and up-to-date with the Registry (green icon in the top left corner of the process group):

Screen Shot 2018-03-30 at 2.02.20 PM.png

We are now ready to deploy this workflow in production! Here is what we’re going to do:

– import the versioned workflow in production
– update the variables to set the values for the production environment
– start the process group

For the demo I’ll use it with the interactive shell, but you can script the commands (have a look here).

$ ./cli.sh
 _ ___ _
 Apache (_) .' ..](_) ,
 _ .--. __ _| |_ __ )\
[ `.-. | [ |'-| |-'[ | / \
| | | | | | | | | | ' '
[___||__][___][___] [___]', ,'
 `'
 CLI v1.6.0

Type 'help' to see a list of available commands, use tab to auto-complete.

Session loaded from /Users/pvillard/.nifi-cli.config

#>

Let’s list the buckets in my Registry:

#> registry list-buckets -u http://nifi-registry:18080

# Name      Id                                   Description
- --------- ------------------------------------ -----------
1 MyProject 2a5566b5-7380-46b8-9598-328e092e8899 (empty)

Let’s list the workflows in the bucket referenced by the ID 1 (I’m using back reference, but you could use the full identifier for fully scripted solutions):

#> registry list-flows -b &1 -u http://nifi-registry:18080

Using a positional back-reference for 'MyProject'

# Name       Id                                   Description
- ---------- ------------------------------------ ----------------------------------------
1 MyWorkflow 9c2874ca-673a-4553-bbb1-5b370ff23b70 This a workflow to demonstrate workfl...

I now want to deploy the last version (-fv 1) of this workflow in my production NiFi:

#> nifi pg-import -b &1 -f &1 -fv 1 -u http://nifi-prod:8080

Using a positional back-reference for 'MyProject'

Using a positional back-reference for 'MyWorkflow'

76f015f8-0162-1000-5470-a7c6511e5685

My process group is now imported in the production NiFi with everything stopped since it’s the first time I deploy it. If I didn’t want to deploy it at the root level of my production NiFi I could have specified a parent process group ID in the previous command.

Let’s now list the variables of this process group:

#> nifi pg-get-vars -pgid 76f015f8-0162-1000-5470-a7c6511e5685 -u http://nifi-prod:8080

# Name                    Value
- ----------------------- ----------------------------------------
1 HDFSconfFiles           /Users/pvillard/Documents/nifi-workdir/fieldcloud/core-site.xml,/Users/pvillard/Documents/nifi-workdir/fieldcloud/hdfs-site.xml
2 HDFSdirectory           /dev/dest/directory
3 HDFSkeytab              myproject-dev.keytab
4 HDFSprincipal           myproject-dev@EXAMPLE.COM
5 ListFilesInputDirectory /dev/tmp

I can now update the variables with the values of my production environment:

#> nifi pg-set-var -pgid 76f015f8-0162-1000-5470-a7c6511e5685 -var HDFSconfFiles -val /etc/hadoop/conf/current/hdfs-site.xml,/etc/hadoop/conf/current/core-site.xml -u http://nifi-prod:8080
#> nifi pg-set-var -pgid 76f015f8-0162-1000-5470-a7c6511e5685 -var HDFSdirectory -val /prod/dest/directory -u http://nifi-prod:8080
#> nifi pg-set-var -pgid 76f015f8-0162-1000-5470-a7c6511e5685 -var HDFSkeytab -val myproject-prod.keytab -u http://nifi-prod:8080
#> nifi pg-set-var -pgid 76f015f8-0162-1000-5470-a7c6511e5685 -var HDFSprincipal -val myproject-prod@EXAMPLE.COM -u http://nifi-prod:8080
#> nifi pg-set-var -pgid 76f015f8-0162-1000-5470-a7c6511e5685 -var ListFilesInputDirectory -val /prod/tmp -u http://nifi-prod:8080

I can confirm in the the UI of my production NiFi that variables have been updated:

Screen Shot 2018-03-30 at 3.11.44 PM.png

I just need to start my process group and we’re done:

#> nifi pg-start -pgid 76f015f8-0162-1000-5470-a7c6511e5685 -u http://nifi-prod:8080

We’ve successfully deployed a new workflow in production. Let’s now update our workflow in the development NiFi and create a new version of it:

Screen Shot 2018-03-30 at 3.38.55 PM.png

I’ve added a PutElasticsearchHttp processor to also send my data into Elasticsearch. Here is the configuration of my processor:

Screen Shot 2018-03-30 at 3.08.41 PM.png

And I’ve added a new variable for the URL of my Elasticsearch instance:

Screen Shot 2018-03-30 at 3.10.50 PM.png

I can see on my process group, that I now have local changes to commit in the NiFi Registry:

Screen Shot 2018-03-30 at 3.14.41 PM.png

To commit the changes, right click, version, commit local changes:

Screen Shot 2018-03-30 at 3.15.27 PM.png

You can comment your changes before committing the changes in the NiFi Registry:

Screen Shot 2018-03-30 at 3.16.32 PM.png

My processor is now up-to-date and I can see the new version in the Registry:

Screen Shot 2018-03-30 at 3.17.33 PM.png

In production I can see that my process group is not up-to-date anymore and a new version is available:

Screen Shot 2018-03-30 at 3.18.52 PM.png

To update the production environment to the latest version, you just need to do the following with the NiFi CLI:

#> nifi pg-change-version -pgid 76f015f8-0162-1000-5470-a7c6511e5685 -u http://nifi-prod:8080

Not only the flow is updated, but there is no downtime: running processors remain running, previously existing variables remain unchanged. You just have to update the new variable and start the process group to start the stopped PutElasticsearchHttp processor:

#> nifi pg-get-vars -pgid 76f015f8-0162-1000-5470-a7c6511e5685 -u http://nifi-prod:8080

# Name                    Value
- ----------------------- ----------------------------------------
1 HDFSconfFiles           /etc/hadoop/conf/current/hdfs-site.xml,/etc/hadoop/conf/current/core-site.xml
2 HDFSdirectory           /prod/dest/directory
3 HDFSkeytab              myproject-prod.keytab
4 HDFSprincipal           myproject-prod@EXAMPLE.COM
5 ListFilesInputDirectory /prod/tmp
6 ElasticsearchURL        http://es-dev:9200

#> nifi pg-set-var -pgid 76f015f8-0162-1000-5470-a7c6511e5685 -var ElasticsearchURL -val http://es-prod:9200 -u http://nifi-prod:8080
#> nifi pg-start -pgid 76f015f8-0162-1000-5470-a7c6511e5685 -u http://nifi-prod:8080

You now have the latest version of your workflow in production and you didn’t experience any downtime since you didn’t modify your source processor at all.

Screen Shot 2018-03-30 at 4.15.09 PM.png


  • Case 2 – One NiFi Registry using NiPyAPI

Environment details:
NiFi Devhttp://nifi-dev:8080/nifi
NiFi Prodhttp://nifi-prod:8080/nifi
NiFi Registryhttp://nifi-registry:18080/nifi-registry

(it is recommended to read the case 1 as it gives an overview of the overall story)

The first version of my workflow has been committed in the NiFi Registry, I want to deploy it in my production NiFi for the first time.

I assume that you already have a Python environment where you can use the nipyapi library (please refer to the documentation if needed).

>>> help(nipyapi)
...
VERSION
 0.8.0

AUTHOR
 Daniel Chaffelson
...

>>> devNiFi = 'http://nifi-dev:8080/nifi-api'
>>> registry = 'http://nifi-registry:18080/nifi-registry-api'
>>> prodNiFi = 'http://nifi-prod:8080/nifi-api'

Right now there is no method available to deploy for the first time a versioned workflow as a new process group. This will be added in the next version of the library. In the meantime, you can use the below option:

def deploy_flow_version(parent_pg, location, bucketId, flowId, registryID, ver):
  assert isinstance(parent_pg, nipyapi.nifi.ProcessGroupEntity)
  assert isinstance(location, tuple)
  try:
    return nipyapi.nifi.ProcessgroupsApi().create_process_group(
      id=parent_pg.id,
      body=nipyapi.nifi.ProcessGroupEntity(
        revision=parent_pg.revision,
        component=nipyapi.nifi.ProcessGroupDTO(
          position=nipyapi.nifi.PositionDTO(
            x=float(location[0]),
            y=float(location[1])
          ),
          version_control_information=nipyapi.nifi.VersionControlInformationDTO(
            bucket_id=bucketId,
            flow_id=flowId,
            registry_id=registryID,
            version=ver
          )
        )
      )
    ).id
  except nipyapi.nifi.rest.ApiException as e:
    raise e

I can now create a process group with my versioned workflow:

>>> nipyapi.utils.set_endpoint(prodNiFi)
True
>>> nipyapi.utils.set_endpoint(registry)
True
>>> bucketName = "MyProject"
>>> workflowName = "MyWorkflow"
>>> registryName = "Registry"
>>>
>>> rootPgId = nipyapi.canvas.get_root_pg_id()
>>> rootPg = nipyapi.canvas.get_process_group(rootPgId, identifier_type='id')
>>> bucketID = nipyapi.versioning.get_registry_bucket(bucketName).identifier
>>> workflowID = nipyapi.versioning.get_flow_in_bucket(bucketID, workflowName).identifier
>>> registryID = nipyapi.versioning.get_registry_client("Registry").id
>>> ver = 1
>>> location = (200, 200)
>>>
>>> deploy_flow_version(rootPg, location, bucketID, workflowID, registryID, ver)
'01621009-0a4c-1704-e137-e564eadb11e2'

With the above list of commands we have retrieved the bucket ID, the workflow ID, the ID of the registry client in the production NiFi, and we deployed the versioned flow at the root level (using the root process group) at a given location.

The last command returns the ID of the newly created process group that contain our versioned workflow.

We can now update the variables of the process group:

versionedPg = nipyapi.canvas.get_process_group('01621009-0a4c-1704-e137-e564eadb11e2', identifier_type='id')
nipyapi.canvas.update_variable_registry(versionedPg, [('HDFSprincipal', 'myproject-prod@EXAMPLE.COM'), ('HDFSkeytab', 'myproject-prod.keytab'), ...]

And we now just have to start the process group:

>>> nipyapi.canvas.schedule_process_group('01621009-0a4c-1704-e137-e564eadb11e2', True)
True

Let’s now imagine that we have a new version available for our workflow and we want to update it to the latest version. Then we just need to do the following:

>>> versionedPg = nipyapi.canvas.get_process_group('01621009-0a4c-1704-e137-e564eadb11e2', identifier_type='id')
>>> nipyapi.versioning.update_flow_ver(versionedPg)

We can now update the variables if needed, and start the newly added processors as we did at the end of case 1 description.


  • Case 3 – Two NiFi Registries using the NiFi CLI

Environment details:
NiFi Devhttp://nifi-dev:8080/nifi
NiFi Prodhttp://nifi-prod:8080/nifi
NiFi Registry Devhttp://registry-dev:18080/nifi-registry
NiFi Registry Prodhttp://registry-prod:18080/nifi-registry

(it is recommended to read the case 1 as it gives an overview of the overall story)

In this case, security requirements prevent us to have a single NiFi Registry reachable from all our NiFi environments. Consequently, we have one Registry instance per environment and we have to setup a mechanism to move versioned workflow from one Registry instance to another. I will go very quickly on the basic functionalities (that I described in previous cases) of the CLI and focus on the export/import between two registries.

I assume that each NiFi environment has been configured to register its Registry in the controller settings. If not, refer to the beginning of the article.

Starting point: the initial version of the workflow is committed in the Dev Registry and should be deployed in production.

I first list the buckets in my Dev Registry, list the flows in the bucket I want, and export the flow I’m looking for at the given version into a JSON file:

#> registry list-buckets -u http://registry-dev:18080

# Name      Id                                   Description
- --------- ------------------------------------ -----------
1 MyProject 2a5566b5-7380-46b8-9598-328e092e8899 (empty)

#> registry list-flows -b &1 -u http://registry-dev:18080

Using a positional back-reference for 'MyProject'

# Name       Id                                   Description
- ---------- ------------------------------------ -----------
1 MyWorkflow 3403c78c-7074-45c3-bc3b-aeac75970e85

#> registry export-flow-version -f &1 -fv 1 -o /tmp/my-flow.json -ot json -u http://registry-dev:18080

Using a positional back-reference for 'MyWorkflow'

Since this is first time I deploy this workflow in production, it’s possible I don’t have a bucket for it yet in the Prod Registry. Let’s create one, and let’s create the workflow resource in the bucket so that we can do the import:

#> registry create-bucket -bn "MyProjectProd" -u http://registry-prod:18080

33aacd91-ca5d-4405-a657-05de37da1fb7

#> registry list-buckets -u http://registry-prod:18080

# Name          Id                                   Description
- ------------- ------------------------------------ -----------
1 MyProjectProd 33aacd91-ca5d-4405-a657-05de37da1fb7 (empty)

#> registry create-flow -b &1 -fn "MyWorkflowProd" -u http://registry-prod:18080

Using a positional back-reference for 'MyProjectProd'

67576995-fb0f-4324-987a-dfcf186a24c8

We can now import the versioned workflow that we exported as JSON file from the Dev Registry into the Prod Registry:

#> registry list-buckets -u http://registry-prod:18080

# Name          Id                                   Description
- ------------- ------------------------------------ -----------
1 MyProjectProd 33aacd91-ca5d-4405-a657-05de37da1fb7 (empty)

#> registry list-flows -b &1 -u http://registry-prod:18080

Using a positional back-reference for 'MyProjectProd'

# Name           Id                                   Description
- -------------- ------------------------------------ -----------
1 MyWorkflowProd 67576995-fb0f-4324-987a-dfcf186a24c8 (empty)

#> registry import-flow-version -f &1 -i /tmp/my-flow.json -u http://registry-prod:18080

Using a positional back-reference for 'MyWorkflowProd'

1

Now the versioned workflow is available in the Prod Registry, and we can do exactly the same as we did in the case 1 to deploy it in the production NiFi.

Assuming a new version of the workflow is available in the Dev Registry, you just need to export the version of your choice (using the option -fv) and import it in the existing bucket/workflow in the Prod Registry. The version on the Prod Registry side will be automatically incremented. One benefit of this approach/separation with two registries is that you can move from one registry to another *only* the versions that are considered ready enough to be promoted in the next environment. The full lifecycle workflow could be represented like this:

Screen Shot 2018-03-30 at 6.50.15 PM.png

Using explanations in both case 1 and 3, you should be able to manage the complete life cycle deployment of your workflows using the CLI.


  • Case 4 – Two NiFi Registries using the NiPyAPI

Environment details:
NiFi Devhttp://nifi-dev:8080/nifi
NiFi Prodhttp://nifi-prod:8080/nifi
NiFi Registry Devhttp://registry-dev:18080/nifi-registry
NiFi Registry Prodhttp://registry-prod:18080/nifi-registry

(it is recommended to read the case 1 as it gives an overview of the overall story)

In this case, security requirements prevent us to have a single NiFi Registry reachable from all our NiFi environments. Consequently, we have one Registry instance per environment and we have to setup a mechanism to move versioned workflow from one Registry instance to another. I will go very quickly on the basic functionalities (that I described in previous cases) of NiPyAPI and focus on the export/import between two registries.

I assume that each NiFi environment has been configured to register its Registry in the controller settings. If not, refer to the beginning of the article.

Starting point: the initial version of the workflow is committed in the Dev Registry and should be deployed in production.

I export the flow I’m looking for, at the given version (in this case, version 1), into a JSON file:

>>> devNiFi = 'http://nifi-dev:8080/nifi-api'
>>> devRegistry = 'http://registry-dev:18080/nifi-registry-api'
>>> prodNiFi = 'http://nifi-prod:8080/nifi-api'
>>> prodRegistry = 'http://registry-prod:18080/nifi-registry-api'
>>> bucketName = "MyProject"
>>> workflowName = "MyWorkflow"

>>> nipyapi.utils.set_endpoint(devRegistry)
>>> bucketID = nipyapi.versioning.get_registry_bucket(bucketName).identifier
>>> workflowID = nipyapi.versioning.get_flow_in_bucket(bucketID, workflowName).identifier
>>> nipyapi.versioning.export_flow_version(bucketID, workflowID, version='1', file_path='/tmp/my-flow.json', mode='json'

Since this is first time I deploy this workflow in production, it’s possible I don’t have a bucket for it yet in the Prod Registry. Let’s create one so that we can do the import:

>>> nipyapi.utils.set_endpoint(prodRegistry)
True
>>> nipyapi.utils.set_endpoint(prodNiFi)
True
>>> bucketNameProd = "MyProjectProd"
>>> nipyapi.versioning.create_registry_bucket(bucketNameProd)
{'created_timestamp': 1522430434276,
 'description': None,
 'identifier': '8a9e46b0-722e-40f6-9401-98103de56435',
 'link': {'params': {'rel': 'self'},
 'rel': None,
 'rels': None,
 'title': None,
 'type': None,
 'uri': None,
 'uri_builder': None},
 'name': 'MyProjectProd',
 'permissions': {'can_delete': True, 'can_read': True, 'can_write': True}}
>>> bucketProdID = nipyapi.versioning.get_registry_bucket(bucketNameProd).identifier

I can now import my versioned workflow in this bucket:

>>> workflowNameProd = "MyWorkflowProd"
>>> nipyapi.versioning.import_flow_version(bucketProdID, encoded_flow=None, file_path='/tmp/my-flow.json', flow_name=workflowNameProd, flow_id=None

Now the versioned workflow is available in the Prod Registry, and we can do exactly the same as we did in the case 2 to deploy it in the production NiFi.

Let’s assume you do modifications in the Dev NiFi and you want to deploy the new version of the workflow in the existing bucket of the Prod Registry, then you would do (assuming you already the export, as before, in /tmp/my-flow.json):

>>> workflowProdID = nipyapi.versioning.get_flow_in_bucket(bucketProdID, workflowNameProd).identifier
>>> nipyapi.versioning.import_flow_version(bucketProdID, encoded_flow=None, file_path='/tmp/my-flow.json', flow_name=None, flow_id=workflowProdID

And you can now update the process group of the production NiFi to the latest version using the commands already described in case 2.


  • Using the CLI in a non-interactive mode

Refer to the CLI documentation for more details, but you’d probably use the CLI in the non interactive mode in your deployments.

I create 4 configuration files:

nifi-dev.properties

baseUrl=http://nifi-dev:8080
keystore=
keystoreType=
keystorePasswd=
keyPasswd=
truststore=
truststoreType=
truststorePasswd=
proxiedEntity=

nifi-prod.properties

baseUrl=http://nifi-prod:8080
keystore=
keystoreType=
keystorePasswd=
keyPasswd=
truststore=
truststoreType=
truststorePasswd=
proxiedEntity=

registry-dev.properties

baseUrl=http://registry-dev:8080
keystore=
keystoreType=
keystorePasswd=
keyPasswd=
truststore=
truststoreType=
truststorePasswd=
proxiedEntity=

registry-prod.properties

baseUrl=http://registry-prod:8080
keystore=
keystoreType=
keystorePasswd=
keyPasswd=
truststore=
truststoreType=
truststorePasswd=
proxiedEntity=

I can now use the following commands to automatically export/import a given bucket, workflow, version from the Dev Registry to the Prod Registry. In order to ease the process, I’m specifying “json” as the output type, and I’m using the jq command to parse the results.

#!/bin/sh

set -e

# Set the variables
BUCKET="MyProject"
WORKFLOW="MyWorkflow"
BUCKETPROD="MyProjectProd"
WORKFLOWPROD="MyWorkflowProd"
VERSION=1
FILE="/tmp/my-flow.json"

set_endpoints() {
 ./cli.sh session set nifi.props $1
 ./cli.sh session set nifi.reg.props $2
}

get_bucketid() {
 result=$(./cli.sh registry list-buckets -ot json | jq '.[] | select(.name=="'$1'") | .identifier')
 if [ -z "$result" ]; then
   >&2 echo "No bucket with name $1"
   return 1
 else
   echo $result
 fi
}

get_workflowid() {
 result=$(./cli.sh registry list-flows -b $1 -ot json | jq '.[] | select(.name=="'$2'") | .identifier')
 if [ -z "$result" ]; then
   >&2 echo "No workflow with name $2 in bucket $1"
   return 1
 else
   echo $result
 fi
}

# Set the endpoints
set_endpoints "nifi-dev.properties" "registry-dev.properties"

# Export the workflow
BUCKETID=$(get_bucketid $BUCKET)
WORKFLOWID=$(get_workflowid $BUCKETID $WORKFLOW)
./cli.sh registry export-flow-version -f $WORKFLOWID -fv $VERSION -o $FILE -ot json

# Change the endpoints
set_endpoints "nifi-prod.properties" "registry-prod.properties"

# Import the workflow
BUCKETID=$(get_bucketid $BUCKETPROD)
WORKFLOWID=$(get_workflowid $BUCKETID $WORKFLOWPROD)
./cli.sh registry import-flow-version -f $WORKFLOWID -i $FILE

I’m sure you are all set to script all the operations you need.

Also, be aware that there is a command

registry transfer-flow-version

that you can use to do the export/import I described above:

#> registry transfer-flow-version help

Transfers a version of a flow directly from one Registry to another,
without needing to export/import. If --sourceProps is not specified,
the source flow is assumed to be in the same registry as the
destination flow. If --sourceFlowVersion is not specified, then the
latest version will be transferred

Also, instead of using the ‘session’ command to set the endpoint, you can directly use the properties file in the command using the ‘-p’ option:

./bin/cli.sh registry list-buckets -p /path/to/local-nifi-registry.properties

Using the ‘session’ command will change the default settings of the interactive shell for the next time you’re using it.


  • What is next? Are there some limitations?

There are already few JIRAs opened to improve the CLI, and Dan Chaffelson is very active and keen to improve his library. So it will definitely improve over time on both sides and, as I said, in the future, the Registry will also store much more than just workflows.

If you want to see new features in the CLI, feel free to open a JIRA here or send an email on the users mailing list of the Apache NiFi project. If you want to see improvements in the Python library, you can quickly and easily interact on the Github repository.

Few limitations to keep in mind right now:

  1. Support for embedded versioned process groups is not fully ready yet when you have multiple Registry instances and you need to export/import things between the instances. You can check the status on NIFI-5029.
  2. When versioning a process group and deploying it in NiFi, all the sensitive properties are emtpied for security reasons. Just like the variables, you would need to set the values using the REST API or NiPyAPI (no support in the CLI yet) when you deploy the workflow for the first time (or when adding new sensitive properties between two versions). Regarding support in the CLI, you can check the status on NIFI-5028.
  3. The example I gave is not using any controller service defined at process group level. If you’re using controller services, the CSs will be stopped when deploying the versioned workflow for the first time. Using pg-start won’t start the controller services, they need to be started before you can actually start the process group. There is no support for this in the CLI yet, but you can use the REST API of NiPyAPI. Regarding support in the CLI, you can check the status on NIFI-5027.
    >> Note: The improvement has been merged in master code of NiFi but didn’t make it in NiFi 1.6.0. You can, however, build the CLI from master code and use it with NiFi 1.6.0.

There is much more on the roadmap, so stay tuned!

I hope this overview is already going to help you a lot in automating Flow Development Life Cycle (FDLC). It should integrate nicely in your DevOps pipelines and make your life much easier. Keep in mind that a lot of developments are currently in progress and that this article could quickly be “out dated”. I’ll try to do my best to keep this one up-to-date or to post new articles in case of new major features.

As usual, thanks for reading me, and feel free to comment/ask questions.