Monitoring Driven Development with NiFi 1.7+

I already discussed the Monitoring Driven Development (MDD) approach in this post and already covered some monitoring topics around NiFi in few posts (reading the posts would probably help you for reading this one – or at least refresh your memory on some concepts). In this post I want to present once again the MDD approach with some new features of NiFi 1.7 making things a bit easier and cleaner.

What do I mean when talking about MDD in NiFi? When I say MDD, I refer to the fact that, when a team is developing a “core workflow” for a use case, the team should also develop a “monitoring workflow” that will be in charge of monitoring and reporting the health of the “core workflow” based on their business requirements.

How does this work in NiFi? This approach completely relies on two things:

  • A proper naming convention for the components used in the workflows
  • The use of the Site-to-Site Reporting Tasks (see here)

Basically, the idea is to setup the S2S reporting tasks to send data to an input port dedicated to monitoring, then basic routing and filtering is performed assuming projects are following the naming convention so that all the data related to a given “core workflow” is automatically routed to the associated “monitoring workflow”. This way, it’s up to each project to implement the monitoring they want for their use case based on the requirements and SLA they have.

What’s new in NiFi that’s going to help me with MDD?

There are quite few interesting JIRAs but the two most important ones are:

  • [NIFI-4814] – Add distinctive attribute to S2S reporting tasks (NiFi 1.6.0)
  • [NIFI-5122] – Add record writer to S2S Reporting Tasks (NiFi 1.7.0)

Example

For my example, the naming convention is the following: any component that should be monitored should be named with a prefix ABC_ where ABC is the three letters abbreviation representing the use case/project.

I have two projects, one named PJA and one named PJB.

Let’s say the project A is using HandleHttpRequest/Response to receive XML data over HTTP and has two monitoring requirements:

  • Send an email to the team in case a bulletin is generated
  • Monitor in Grafana the amount of valid/invalid data being processed

Let’s say the project B is using ListenTCPRecord for a use case around logs ingestion and has one monitoring requirement:

  • Send an email to the team in case back pressure is enabled

The development of the workflows is outside the scope of this post. I’ll keep things simple as the objective is to demonstrate the MDD approach, not how to implement the workflows of the specific use cases I chose.

A *very basic* implementation of the “core workflow” of PJA could be:

Screen Shot 2018-08-29 at 3.55.19 PM.png

A *very basic* implementation of the “core workflow” of PJB could be:

Screen Shot 2018-08-29 at 4.36.29 PM

Now I’m going to configure the S2S Reporting Tasks – one for Controller Status data and one for Bulletins data. Note that there is a new feature allowing users to select a Record Writer for the generated data. I’m going to use the Avro one for the reporting tasks so that I don’t have to care about the schemas at all. For that I need to create a Controller Service (in Controller Settings menu since this controller service is exclusively used by reporting tasks).

Screen Shot 2018-08-29 at 5.02.26 PM.png

After enabling the CS, I can configure my reporting tasks.

The one for bulletins:

Screen Shot 2018-08-29 at 5.03.59 PM

And the one for controller status:

Screen Shot 2018-08-29 at 5.05.21 PM

Note that you have additional S2S Reporting Tasks if needed. The one about provenance events can be particularly useful for some monitoring requirements such as latency.

At the root level of my canvas, I’ve something looking like:

Screen Shot 2018-08-29 at 5.45.38 PM.png

I’m receiving all the monitoring data sent by the reporting tasks in the ‘monitoring’ input port. Then in my ‘Monitoring’ process group, I have something looking like:

Screen Shot 2018-08-29 at 5.46.48 PM.png

You can notice I’m sending all the data to a ‘Technical Monitoring’. That’s because, the team in charge of administrating/monitoring the platform probably wants to also have a look at the monitoring data to ensure the full system is performing OK (common examples: send the bulletins to an external tool like ES/Kibana, send email alerts in case back pressure is enabled on a connection, etc).

On the upper part of the workflow, that’s where I’m processing the monitoring data to split it and route the monitoring data by project. First I’m using a RouteOnAttribute to route the data based on the type (bulletins, controller status, etc):

Screen Shot 2018-08-29 at 5.50.48 PM

Then I’m using an UpdateRecord processor to add a ‘project’ field that I’m computing based on the component name.

For bulletins:

Screen Shot 2018-08-29 at 5.55.34 PM

For controller status:

Screen Shot 2018-08-29 at 5.52.43 PM.png

Since the data has been sent in Avro format by the reporting task, I’m using an Avro reader Controller Service with default settings. And, from now on, I want the data to be in JSON (as it’ll be easier in case I want to send the data via an email or via HTTP). I’m configuring few controller services: one Avro Schema Registry containing the schemas (that you can retrieve in the additional details of the reporting task documentation, see below), one JSON Reader and one JSON Writer.

For instance, to get the schema of the S2S Bulletin Reporting Task, go on the reporting task list:

Screen Shot 2018-08-29 at 5.59.40 PM.png

Click on the documentation icon on the left:

Screen Shot 2018-08-29 at 6.00.21 PM.png

And then click Additional Details:

Screen Shot 2018-08-29 at 6.00.58 PM.png

When adding the schemas in the Avro Schema Registry, use the type of the reporting task as the name of the schema and don’t forget to add a ‘project’ field in the schemas:

Screen Shot 2018-08-29 at 6.02.28 PM.png

Now you can easily reference the schemas to use in the configuration of the JSON Reader/Writer using the expression language because the flow files generated by the reporting tasks have an attribute with the type of the source reporting task.

Example of attributes on a flow file generated by a RT:

Screen Shot 2018-08-29 at 6.28.56 PM.png

JSON Reader:

Screen Shot 2018-08-29 at 6.03.44 PM.png

JSON Writer:

Screen Shot 2018-08-29 at 6.04.13 PM.png

Now I’ve a ‘project’ field in my data with the project name (if the naming convention has been correctly used), then you can use a QueryRecord processor to route the data for each project:

Screen Shot 2018-08-29 at 6.07.17 PM.png

And we can confirm that only the data for project A is routed to the ‘Monitoring Workflow’ for project A. If looking at the data in the relationship routing data for PJA:

Screen Shot 2018-08-29 at 6.08.53 PM.png

Now, when deploying a new project for the first time (ie importing the process group of the “core workflow”), you just need to update the QueryRecord processor to add a dynamic property for the new project as well as deploying the process group for the “monitoring workflow” of the project. In combination with the NiFi Registry it makes things really smooth when you need to update the workflows with a new version.

I’m not going to develop the monitoring workflows for PJA and PJB but it’d only be a matter of few record processors to filter the data (bulletin? back pressure enabled?), and then use a processor to send the data (ElasticSearch, PutEmail, InvokeHttp, etc):

  • “send an email when a bulletin is generated” – RouteOnAttribute to get the bulletins data and PutEmail to send an email
  • “monitor the number of valid/invalid XML files being processed” – filter the controller status statistics on the ‘Connection’ elements and keep only the statistics of the connections after the ValidateRecord processor, this can be easily done with a QueryRecord processor. The numbers can then be sent to an external system such as ElasticSearch for a dashboard in Kibana/Grafana
  • “send an email when back pressure is enabled” – use a QueryRecord processor on the controller status data by querying the field ‘isBackPressureEnabled’ and then use PutEmail processor.

The full workflow of this example is available as a gist here. Note that, since it’s a template, it does not contain the Reporting Tasks configuration. But you should be up and running quite quickly with the above screenshots.

As always, thanks for reading this post, feel free to comment and/or ask questions. In case you have suggestions to make things even better, just let me know and, maybe, I’ll have to publish a new version of this post to present the new features of a next release 😉

Discussion around Ranger policies for HDFS

OK… so… Apache Ranger is the piece you want to use to define authorizations in your cluster (and, most importantly, get all the audits coming with all the policies you define). Not only for HDFS but for all the components you’re using. In this post I just want to discuss about policies to secure how HDFS is used.

More specifically, here is my scenario:

  • I have a project called ‘myProject’ consisting of a Spark job accessing and processing sensitive data sitting in HDFS and the result of this processing is written back into HDFS as well. This data is so sensitive that users should not be able to access it unless specific permissions are granted. The job is running with the user ‘myProjectUser’.

Now… let’s say the team working on the project does not have permissions to access the production data and has mock datasets for development and staging environments. OK great, that’s usual situation. BUT… we have a rogue user, John Doe, in the development team and, somehow, he is able to introduce some lines of code so that the Spark job is writing the result in two places: the expected location where results should be written, and another unexpected place where John Doe has access on the production cluster. How can you prevent that?

Well… the immediate answer is a proper development process with pull requests and peer reviewing on any commit pushed to the code repository you’re using. But… let’s say that’s not enough, or, that two rogue users agreed to commit that code through a pull request. How can you set the Ranger policies to prevent this specific scenario?

Note – for this post, when I’m saying a folder has hdfs:hadoop:755, it means that chmod 755 and chown hdfs:hadoop have been set on that folder.

Let’s talk quickly about how Ranger works and, more specifically, how it works with HDFS. By default, Ranger allows you to define “allow” policies. Meaning that you grant specific access (read, write, execute) to a specific resource (file, folder) in HDFS. But something you need to recall is that, by default, HDFS ACLs will apply in combination with Ranger policies (both are used to check whether a user can access a resource). It means that, if you have:

  • a folder /myFolder with hdfs:hadoop:755 in HDFS
  • an “allow” policy saying that people belonging to group “sales” have RWX access to /myFolder

In reality, this “allow” policy does not prevent any access from people not belonging to the “sales” group. Since chmod is 755 anyone having access to HDFS can read the data inside that folder.

However, if you set chmod 000 to that folder, then, only the “allow” policy will apply and only people belonging to the “sales” group will have access to the folder.

Note – that’s not entirely true since there is one exception: the HDFS superusers you defined. Any HDFS superuser can access any folder. When sending a request to the Name Node (NN), the NN will check the identity of the requester and if it’s a superuser, it’ll not go through the authorization parts of the code. If you want to prevent HDFS superusers to access some data, then you probably want to look at Ranger KMS that will help ensuring segregation of duties by encrypting the data in HDFS using encrypting keys only available to specific users.

Note – it’s possible to prevent Ranger fallbacking on HDFS ACLs by setting
xasecure.add-hadoop-authorization=false
but it means you have to set Ranger policies for everything. That’s doable but can require some efforts to do it properly.

Now that you’re aware of that behavior, you probably want to look at this post with some best practices about Ranger and HDFS. The first thing I’d recommend is to change the default umask from 022 to 077 to ensure that, by default, a newly created file/directory can only be read by the owner unless a Ranger policy states otherwise.

Then, you want to take care of some specific folders in HDFS such as the ones under /apps (folders used by Hive, HBase, Zeppelin, etc). The idea is to set chmod 000 on the folders so that HDFS ACLs do not apply and only Ranger policies are enforced. Let’s say you have /apps/hive/warehouse where all your Hive databases are stored. If you set chmod 000 on /apps/hive, then no one can go inside a subfolder of /apps/hive unless there is an “allow” policy in Ranger for that user.

OK, that’s great… but let’s go back to our scenario where we have a rogue user… Our John Doe can SSH to an edge node on the production cluster where he has his home directory on HDFS (/user/jdoe with jdoe:jdoe:700). We could say that we are safe: the job running as myProjectUser does not have permissions to write in /user/jdoe. But there is nothing preventing jdoe to change the permissions on his home directory with a chmod 777 (however, only an HDFS superuser can execute a chown command to change ownership on a directory). Once John Doe changed the permissions, the job can write into that folder and then John can do whatever he wants with the data… Even though you’re defining “allow” policies in Ranger.

Let’s have a look at the folders potentially exposing this issue. We have:

  • /user
  • /tmp
  • /app-logs
  • /mr-history
  • /spark-history
  • /spark2-history

The above folders (except /user) have, by default, a chmod 777 technically allowing anyone to create a folder there and use that folder for an unexpected purpose. Let’s have a look and let’s try to ensure no one can create something we don’t want.

Example – /spark-history & /spark2-history (part 1)

That’s where the Spark History Server(s) are going to store data about ongoing Spark jobs, and completed jobs based on retention policies you defined. If we look at the structure:

/spark-history is spark:hadoop:777

and then we have folders such as:

/spark-history/<folder> with <user running the job>:hadoop:770

Folders are complying with a naming convention but we don’t really care here.

The objective is to get rid of the chmod 777 at the top level BUT… the issue here is that a user running a job needs the right to create a directory. So, basically, anyone needs the right to create a directory in /spark-history. I can’t just set chmod 000 and define “allow” policies… And that’s where Ranger introduced a very nice feature: the “deny” policies.

The “deny” policies in Ranger

If you want all the details, have a look here and here. The main idea is to add the option, when defining a policy, to:

  • exclude groups or users from the allow policy
  • deny access to users/groups on the resource
  • exclude groups or users from the deny policy

Then, here is how the policy is evaluated in the authorizer:

Ranger-Policy-Evaluation-Flow.png

Basically, if there is no “deny” policy, the most permissive access between Ranger policies and HDFS ACLs will be used to grant access to the resource. In other words, unless you specifically defined a “deny” policy that applies for the accessed resource, the HDFS ACLs will always be considered for authorization.

Enabling the “deny” policies

By default, the “deny” policies are not available in the Ranger UI. It’s mainly because the concept can be hard to understand and things can quickly become a mess when using the “deny” policies. Also, for most of the users, this feature will never be used. One needs to be careful and rigorous when using that feature.

To enable it, retrieve the HDFS service definition from the Ranger Admin server:

$ curl -k -u <user> https://<ranger_admin_server>:6182/service/public/v2/api/servicedef/1 > hdfs.json

Then, have a look at the content:

$ cat hdfs.json
{...,"isEnabled":true,"name":"hdfs","options":{"enableDenyAndExceptionsInPolicies":"false"}...}

In the “options” field, you can add (or modify it if it’s already there):

"options":{"enableDenyAndExceptionsInPolicies":"true"}

Then you just have to update the service definition:

$ curl -k -u <user> -X PUT -H "Accept: application/json" -H "Content-Type: application/json" -d @hdfs.json https://<ranger_admin_server>:6182/service/public/v2/api/servicedef/1

Variables in Ranger

In addition to the “deny” policies (introduced in Ranger 0.6), we’re going to use the variables (introduced in Ranger 0.7). We can use:

  • {OWNER} – owner of the resource
  • {USER} – user accessing the resource

It’s a very convenient way to define policies without specifically specifying users.

Example – /spark-history & /spark2-history (part 2)

We now have the tools to secure our /spark-history server by creating the following rules:

  • on /spark-history
    • Allow policy for group “public”, RWX permissions, non-recursive. This rule allows us to set chmod 000 in HDFS to keep things clean.

policy_spark_folder.png

  • on /spark-history/*
    • Allow policy for group “hadoop” and user “{OWNER}”, RWX permissions.
    • Deny policy for group “public”, RWX permissions. For any resource in /spark-history, no one has access.
    • Exclude deny policy for group “hadoop” and user “{OWNER}”, RWX permissions. For any resource in /spark-history, only users of the “hadoop” group (such as spark), and owner of the resources have RWX permissions.

policy_spark_content.png

Note – “public” group is a convention meaning anyone.
Note – same approach applies for /spark2-history

With the policies we defined, anyone can create a folder, but only the owner of that folder can access the data inside, or write data inside. We have now secured our /spark-history and /spark2-history folders.

Example – /mr-history

Let’s have a look at the folders structure for /mr-history

/mr-history with mapred:hadoop:777

/mr-history/done with mapred:hadoop:777
/mr-history/tmp with mapred:hadoop:777

/mr-history/tmp/<user> with <user>:hadoop:770

/mr-history/done/<year> with mapred:hadoop:770
/mr-history/done/<year>/<month> with mapred:hadoop:770
/mr-history/done/<year>/<month>/<day> with mapred:hadoop:770
/mr-history/done/<year>/<month>/<day>/<folderID> with mapred:hadoop:770

And in that last folder, we only have files (usually an XML file and a JHIST file for each job) with <user running the job>:hadoop:770.

When a job is launched by myUser, a folder (if not already existing) named myUser is created in /mr-history/tmp and this folder will be used to store metadata files about the running job. Once the job is completed, the user mapred will move the file from that folder into the corresponding /mr-history/done subfolder.

In conclusion, we can set chmod 000 on /mr-history and define the following rules:

  • on /mr-history/done
    • grant RWX permissions to group “hadoop” and user “activity_analyzer” (if you’re using SmartSense to provide statistics on your cluster)
  • on /mr-history/tmp, non-recursive
    • grant RWX permissions to group “public” so that anyone can create a folder
  • on /mr-history/tmp/*
    • grant RWX permissions to group “hadoop”, users “activity_analyzer” and “{OWNER}”
    • deny RWX permissions to group “public”
    • exclude deny RWX permissions to group “hadoop”, users “activity_analyzer” and “{OWNER}”

Note – it’s quite similar to the /spark-history approach.

Example – /user

For that specific folder, we know that no one except HDFS superusers should be able to create a folder in /user, and every folder in /user should be something like /user/<user> with permissions <user>:hdfs:700 (just like usual home directories).

Then, we just have to set chmod 000 on the /user folder and add an “allow” policy:

  • on /user/{USER}
    • allow policy granting RWX permissions to user “{USER}”

But we also have a particular situation to manage: the share lib folder of Oozie, by default, is located at /user/oozie/share/lib. We need to allow RX access to that folder to anyone launching Oozie jobs:

  • on /user/oozie/share/lib
    • allow policy granting RX access to group “public”
    • allow policy granting RWX access to group “hadoop”

Since /user is set with chmod 000 we are sure no one can go inside that folder unless an allow policy is created.

Example – /app-logs

Structure of /app-logs is very similar to what we’ve seen so far:

/app-logs with yarn:hadoop:777
/app-logs/<user> with <user>:hadoop:770

We’re adding the following rules:

  • on /app-logs, non-recursive
    • grant RWX permissions to group “public” so that anyone can create a folder (when a user is launching a job for the first time on the cluster, the user needs permission to create the folder)
  • on /app-logs/*
    • grant RWX permissions to group “hadoop” and user “{OWNER}”
    • deny RWX permissions to group “public”
    • exclude deny RWX permissions to group “hadoop” and user “{OWNER}”

We can now set chmod 000 on /app-logs to secure that folder.

Example – /tmp

The /tmp folder is similar to what we did so far, but with an exception: there is a /tmp/hive folder used by Hive to store temporary data when Hive queries are executed. Because of HIVE-18287, this folder needs to have chmod 733 or above (at HDFS ACLs level).

In /tmp/hive, we have folders like /tmp/hive/<user> with <user>:hdfs:700.

To summarize, we need to:

  • Allow users creating folders in /tmp
  • Keep chmod 733 on /tmp/hive
  • Allow users creating folders in /tmp/hive/
  • Only allow owners to modify data in /tmp/*

As you can see, it’s going to be difficult to manage because of the embedded folder used by Hive. The best approach is to change the Hive’s configuration to change the location of the scratch directory. What I suggest is:

hive.exec.scratchdir=/apps/hive/tmp

This way you can easily define policies on that particular folder by following the same approach we did so far on the other folders. Once this is done, it’s also easy to have the same approach to secure the /tmp folder:

  • on /apps/hive/tmp, non-recursive
    • grant RWX permissions to group “public” so that anyone can create a folder (when a user is launching a job for the first time on the cluster, the user needs permission to create the folder)
  • on /apps/hive/tmp/*
    • grant RWX permissions to group “hadoop” and user “{OWNER}”
    • deny RWX permissions to group “public”
    • exclude deny RWX permissions to group “hadoop” and user “{OWNER}”
  • on /tmp, non-recursive
    • grant RWX permissions to group “public” so that anyone can create a folder
  • on /tmp/*
    • grant RWX permissions to group “hadoop” and user “{OWNER}”
    • deny RWX permissions to group “public”
    • exclude deny RWX permissions to group “hadoop” and user “{OWNER}”

Note – if you are using Spark/Spark2, don’t forget that it relies on Hive’s configuration files. In case you’re using an Ambari managed Hortonworks cluster, you would have to add custom properties in “Custom spark-hive-site-override” for both Spark and Spark2 services.

Note – if you are using Oozie and have workflows using Spark actions, then you’d have to leverage the action configuration feature in Oozie. You can have a look here. In the case of the Hortonworks distribution, by default, you have

oozie.service.HadoopAccessorService.action.configurations=*=action-conf

And you have the following files/folders:

/etc/oozie/conf/action-conf/hive.xml
/etc/oozie/conf/action-conf/hive/hive-site.xml
/etc/oozie/conf/action-conf/hive/tez-site.xml
/etc/oozie/conf/action-conf/hive/atlas-application.properties

The hive-site.xml is automatically copied from /etc/hive/conf folder when Oozie server is restarted. But you need to do a change to have this data loaded for Spark actions. You can create spark and spark2 folders and create a symbolic link to the hive-site.xml file.

Conclusion

Objective of this post was to give a quick overview of how Ranger is working for HDFS and what you need to consider if you want to secure your cluster. As you saw in this post, if you also want to prevent rogue users accessing data by changing the behavior of an application, you need to set few rules in order to secure the folders having a default chmod 777 at the HDFS ACLs level.

This post is certainly not exhaustive but should give you an idea of what you can do. As usual, feel free to comment / ask questions, and thanks for reading thus far.