Secured NiFi cluster with Terraform on the Google Cloud Platform

This story is a follow up of this previous story about deploying a single secured NiFi instance, configured with OIDC, using Terraform on the Google Cloud Platform. This time it’s about deploying a secured NiFi cluster.

In this story, we’ll use Terraform to quickly:

  • deploy a NiFi CA server as a convenient way to generate TLS certificates
  • deploy an external ZooKeeper instance to manage cluster coordination and state across the nodes
  • deploy X secured NiFi instances clustered together
  • configure NiFi to use OpenID connect for authentication
  • configure an HTTPS load balancer with Client IP affinity in front of the NiFi cluster

Note — I assume you have a domain that you own (you can get one with Google). It will be used to map a domain to the web interface exposed by the NiFi cluster. In this post, I use my own domain: pierrevillard.com and will map nifi.pierrevillard.com to my NiFi cluster.

Disclaimer — the below steps should not be used for a production deployment, it can definitely get you started but I’m just using the below to start a secured cluster (there is no configuration that one would expect for a production setup such as a clustered Zookeeper, disks for repositories, etc).

If you don’t want to read the story and want to get straight into the code, it’s right here!


What is Terraform?

Terraform is a tool for building, changing, and versioning infrastructure safely and efficiently. Terraform can manage existing and popular service providers as well as custom in-house solutions.

Configuration files describe to Terraform the components needed to run a single application or your entire datacenter. Terraform generates an execution plan describing what it will do to reach the desired state, and then executes it to build the described infrastructure. As the configuration changes, Terraform is able to determine what changed and create incremental execution plans which can be applied.

The infrastructure Terraform can manage includes low-level components such as compute instances, storage, and networking, as well as high-level components such as DNS entries, SaaS features, etc.


What is NiFi?

Apache NiFi is an easy to use, powerful, and reliable system to process and distribute data. Apache NiFi supports powerful and scalable directed graphs of data routing, transformation, and system mediation logic. In simpler words, Apache NiFi is a great tool to collect and move data around, process it, clean it and integrate it with other systems. As soon as you need to bring data in, you want to use Apache NiFi.


Why ZooKeeper?

Apache NiFi clustering

Best is to refer to the documentation, but, in short… NiFi employs a Zero-Master Clustering paradigm. Each node in the cluster performs the same tasks on the data, but each operates on a different set of data. One of the nodes is automatically elected (via Apache ZooKeeper) as the Cluster Coordinator. All nodes in the cluster will then send heartbeat/status information to this node, and this node is responsible for disconnecting nodes that do not report any heartbeat status for some amount of time. Additionally, when a new node elects to join the cluster, the new node must first connect to the currently-elected Cluster Coordinator in order to obtain the most up-to-date flow.


OAuth Credentials

First step is to create the OAuth Credentials (at this moment, this cannot be done using Terraform).

  • Go in your GCP project, APIs & Services, Credentials.
  • Click on Create credentials, OAuth client ID. Select Web application.
  • Give a name like “NiFi”. For Authorized JavaScript origins, use your own domain. I’m using: https://nifi.pierrevillard.com. For Authorized redirect URIs, I’m using: https://nifi.pierrevillard.com/nifi-api/access/oidc/callback. Please adapt with your own domain (note there is no port as we’ll use the load balancer to access the cluster)
  • Click Create
Create the OAuth credentials

Once the credentials are created, you will get a client ID and a client secret that you will need in the Terraform variables.

By creating the credentials, your domain will be automatically added to the list of the “Authorized domains” in the OAuth consent screen configuration. It protects you and your users by ensuring that OAuth authentication is only coming from authorized domains.

Download the NiFi binaries in Google Cloud Storage

In your GCP project, create a bucket in Google Cloud Storage. We are going to use the bucket to store the Apache NiFi & ZooKeeper binaries (instead of downloading directly from the Apache repositories at each deployment), and also as a way to retrieve the certificates that we’ll use for the HTTPS load balancer.

Note — you’ll need Apache ZooKeeper 3.5.5+.

You can download the binaries using the below links:

Here is what it looks like:

Content of the bucket in Google Cloud Storage

Note — you’ll need to use the NiFi Toolkit version 1.9.2

Deploy NiFi with Terraform

Once you have completed the above prerequisites, installing your NiFi cluster will only take few minutes. Open your Google Cloud Console in your GCP project and run:

Deploy script

If you execute the above commands, you’ll be prompted for the below informations. However, if you don’t want to be prompted, you can directly update the variables.tf file with your values to deploy everything.

Variables to update:

  • project // GCP Project ID
  • nifi-admin // Google mail address for the user that will be the initial admin in NiFi
  • san // FQDN of the DNS mapping for that will be used to access NiFi. Example: nifi.example.com
  • proxyhost // FQDN:port that will be used to access NiFi. Example: nifi.example.com:8443
  • ca_token // The token to use to prevent MITM between the NiFi CA client and the NiFi CA server (must be at least 16 bytes long)
  • oauth_clientid // OAuth Client ID
  • oauth_secret // OAuth Client secret
  • instance_count // Number of NiFi instances to create
  • nifi_bucket // Google Cloud Storage bucket containing the binaries

Here is what it looks like on my side (after updating the variables.tf file):

Execution of the deploy script

Explanations

The first step is to deploy the NiFi Toolkit on a single VM to run the CA server that is used to generate certificates for the nodes and the load balancer. Once the CA server is deployed, a certificate is generated for the load balancer and pushed to the Google Cloud Storage bucket.

The script you started is waiting until the load balancer certificate files are available on GCS. Once the files are available, files are retrieved locally to execute the remaining parts of the Terraform template. It will deploy the ZooKeeper instance as well as the NiFi instances and the load balancer in front of the cluster. All the configuration on the NiFi instances is done for you. Once the script execution is completed, certificates files are removed (locally and on GCS).

After 5 minutes or so…

The load balancer has been created and you can retrieve the public IP of the load balancer:

Retrieve the external public IP of the HTTPS load balancer

You can now update the DNS records of your domain to add a DNS record of type A redirecting nifi.pierrevillard.com to the load balancer IP.

I can now access the NiFi cluster using https://nifi.pierrevillard.com and authenticate on the cluster using the admin account email address I configured during the deployment.

Here is my 6-nodes secured NiFi cluster up and running:

6-nodes secured NiFi cluster
6 nodes with the elected primary and coordinator nodes

I can now update the authorizations and add additional users/groups.

Note — you could use Google certificates instead of the ones generated with the CA server to remove the warnings about untrusted certificate authority.

Cleaning

To destroy all the resources you created, you just need to run:

terraform destroy -auto-approve

As usual, thanks for reading, feel free to ask questions or comment this post.

Running visual quality inspection at the edge with Google Cloud and Apache NiFi & MiNiFi

On the 23rd of October 2019, I gave a talk at the Apache Con in Berlin about running visual quality inspection at the edge. This is the story describing the talk and helping you use both Google Cloud and Apache NiFi & MiNiFi to continuously run updated TensorFlow models at the edge.

Context

Picture this: you are in a factory making cookies… you have thousands of cookies going through your production lines every day and you want to make your customers happy. Problem is… a broken cookie makes an unhappy customer! So you want a way to detect the broken cookies before they get into the final packaging.

This is the example I am choosing to support this story but it could really be anything related to visual quality inspection which spans across multiple industries such as retail, manufacturing, energy, etc. Anything that you can think of that would need a ML model trained over a dataset of images.

What is Apache NiFi & MiNiFi?

Apache NiFi is an easy to use, powerful, and reliable system to process and distribute data. Apache NiFi supports powerful and scalable directed graphs of data routing, transformation, and system mediation logic. MiNiFi — a subproject of Apache NiFi — is a complementary data collection approach that supplements the core tenets of NiFi in data flow management, focusing on the collection of data at the source of its creation.

In simpler words, Apache NiFi and MiNiFi provide a great combination of tools and features to collect and move data around, process it, clean it and integrate it with other systems. As soon as you need to bring data in, you want to use Apache NiFi.

What is Google Cloud Vision?

Google Cloud Vision allows you to derive insights from your images in the cloud or at the edge with AutoML Vision or use pre-trained Vision API models to detect emotion, understand text, and more.

With Google Cloud Vision, you have two options:

  • AutoML Vision: automate the training of your own custom machine learning models. Simply upload images and train custom image models with AutoML Vision’s easy-to-use graphical interface; optimize your models for accuracy, latency, and size; and export them to your application in the cloud, or to an array of devices at the edge.
  • Vision API: Google Cloud’s Vision API offers powerful pre-trained machine learning models through REST and RPC APIs. The APIs cover: image classification and labelling into millions of predefined categories, detect objects and faces, read printed and handwritten text, build valuable metadata into your image catalog, moderate content, etc.

In this story we are going to build our own custom machine learning models with our own labels and images using AutoML Vision.

Objective: continuous model retrain

Automatic retrain of the custom model
Automatic retrain of the custom model

The objective is to have an end-to-end system to detect the defects while having a continuous model retrain with the newly collected images. Key benefits are:

  • Automatically train customized ML models in the cloud
  • Efficiently acquire images, label images, deploy model and run inference
  • Continuously refresh models using fresh data from the production lines
  • Minimal human interaction
  • ML model accuracy improving over time

Using a Raspberry Pi at the edge

To collect data from the production lines, I’m going to use a Raspberry Pi on which I’ll be running Apache MiNiFi. I added a camera to take pictures as well as LEDs to give visual indications of what is currently being processed.

Raspberry Pi 3 used to collect data
Raspberry Pi 3 used to collect data

In order to collect data from the edge with the Raspberry Pi, I’m going to use Google IoT Core which is a fully managed service to easily and securely connect, manage, and ingest data from globally dispersed devices.

First step is to create a Device Registry which is a container of devices with shared properties. Once the Registry is created, you can register your device with its device identity. To do that you need to generate a public/private key pair that will be used by the device to authenticate against Google IoT Core (you can also use Certificate Authorities). Here are the commands I used to generate my key pair for the Raspberry Pi.

My device is registered in my device registry
My device is registered in my device registry

Then I’ll be using the MQTT protocol to send data between my device and Google Cloud, it will automatically make the data available into a Pub/Sub topic:

Interface between the device and Google Cloud IoT Core
Interface between the device and Google Cloud IoT Core

To communicate with Google Cloud IoT Core through MQTT, I’ll be using a processor that is currently under code review in a Pull Request on Github. I’ll also make this processor available in a repo I created to share the code snippets of my talk.

One problem, two architectures

When we’ll have collected pictures from our devices, we’ll be able to train our custom model. Then we’ll have to choose between two options:

  • Deploy the model in Google Cloud and do the inference in the cloud. It means that for each picture, the Raspberry Pi with Apache MiNiFi will make an HTTPS call to the served model to get the label of the picture. While this is efficient and is easy to implement and scale, this means making a call over the Internet and a longer inference time. In this story that what I’ll call the mode “cloud” when configuring the device. In the cloud mode, here is what the architecture looks like:
Architecture in the “cloud” mode
Architecture in the “cloud” mode
  • Download the model from Google Cloud on the Raspberry Pi and do the inference at the edge using Apache MiNiFi. Google Cloud provides multiple options to download your custom models with various accuracy versus latency parameters. Running the model on the edge will lead to much faster inference time. In this story that’s what I’ll call the mode “edge” when configuring the device. In the edge mode, here is what the architecture looks like:
Architecture in the “edge” mode
Architecture in the “edge” mode

We’ll also see that, in the edge mode, it’s possible to get an even better inference time using Google’s hardware with the Coral Edge TPU.

Collecting pictures

Here is what the workflow running in MiNiFi (on the Raspberry Pi) looks like:

Workflow running in MiNiFi on the Raspberry Pi to collect images
Workflow running in MiNiFi on the Raspberry Pi to collect images

The workflow is really simple: a “standalone” processor executes a Python script to take pictures at a given frequency using the Pi camera. Then the pictures are fetched into MiNiFi to be sent to Google Cloud IoT Core through the MQTT processor. I’m also switching on/off the amber LED when this process occurs to give a visual indication on the Pi about what’s going on.

As we said, the data will be automatically sent to a Pub/Sub topic and made available for consumption. In the Google Cloud Platform, I’m running a standalone secured Apache NiFi instance (see my previous post for a very quick deployment using Terraform) on which the below workflow is running:

Workflow running in NiFi in GCP to ingest images
Workflow running in NiFi in GCP to ingest images

The pictures are pulled from my topic’s subscription, then the images are stored in Google Cloud Storage and I’m using the Google Cloud Vision API to add the pictures into my dataset (which is the collection of images I’m using to train my custom models).

I won’t go too much into the details as the documentation is self-explanatory but here is the call I’m making — it gives the GCS path of the CSV file I created listing the GCS paths of all the images I ingested during the last X minutes:

curl \
  -X POST \
  -H "Authorization: Bearer $(gcloud auth application-default print-access-token)" \
  -H "Content-Type: application/json" \
https://automl.googleapis.com/v1beta1/projects/${projectID}/locations/us-central1/datasets/${datasetID}:importData \
  -d '{
        "inputConfig": {
          "gcsSource": {
            "inputUris": "gs://${gcs.bucket}/dataset.csv"
          }
        }
      }'

Note 1: the dataset ID is something looking like: ICN4695798657952251904.
Note 2: in NiFi, I externalized a lot of variables (project ID, GCS bucket, dataset ID, etc) that I reuse across the workflow to make it much more easier to use and configure.

Managing your device configuration

Once your device is registered in the Device Registry in Google Cloud IoT Core, you can use the UI to send configuration updates or commands to your devices. Here is what it looks like:

Updating the configuration of my device through MQTT
Updating the configuration of my device through MQTT

On the Raspberry Pi, in MiNiFi, the configuration updates and commands are received by the MQTT processor and I process the received data to update the configuration of my device:

Reception of configuration updates and commands
Reception of configuration updates and commands
Processing configuration updates
Processing configuration updates

When I receive a configuration update, I process the JSON payload to extract the mode and if I should use the Edge TPU or not, I then store this information into a local cache that is also persisted on disk in case of device restart.

Training and serving your model

We now have a device that we are able to configure from anywhere, that is automatically sending pictures from our production lines, and the pictures are automatically stored and added to our Google Cloud Vision dataset to train our custom models.

The next step, in Google Cloud Vision, is to create labels that will be used to classify our images. In this case, I only create two labels:

  • OK: the cookie looks good and can go into the packaging
  • NOK: the cookie is broken and should be removed

In the UI, I’m able to manually label the images and I need, at least, 10 images per label to start training my custom model (the more you have, the better the results are, and that’s why we want to continually ingest new images and retrain our model over time):

My dataset with the pictures of my cookies and my labels
My dataset with the pictures of my cookies and my labels

In my NiFi instance running in GCP, I created a workflow which is used to trigger a model training every day in order to take into account the newly captured images and improve the model’s accuracy over time:

Model training is triggered daily in NiFi
Model training is triggered daily in NiFi

To start the custom model training and wait until the training is completed I’m using the REST APIs described in the documentation. However, at this stage, you have to specify if you want to train a model that will run in the cloud (cloud mode) or at the edge (edge mode):

When training a model to be exported at the edge, you can specify a parameter allowing you to choose if you prefer latency over accuracy:

  • mobile-low-latency-1 for low latency,
  • mobile-versatile-1 for general purpose usage, or
  • mobile-high-accuracy-1 for higher prediction quality.

When a cloud-hosted model is being trained, the Google Cloud Vision UI would look like this:

Cloud-hosted model currently being trained
Cloud-hosted model currently being trained

Once the model training is completed, NiFi will receive a JSON payload such as:

JSON payload when model training is completed
JSON payload when model training is completed

Once the model is trained, we can access a lot of information regarding the model training and its accuracy compared to the provided dataset:

Custom model information about precision, recall, etc
Custom model information about precision, recall, etc

Cloud-hosted model

Once the model is trained, we can call the REST APIs to deploy the model. Here is the part of the NiFi workflow in charge of this:

NiFi deploying the cloud-hostel custom model
NiFi deploying the cloud-hostel custom model

Once the deployment is completed, the model will be automatically exposed through a REST API in Google Cloud (you don’t have to worry about the how and the where, Google takes care of it). The only information you need is the model ID and this is the information we are sending to the Raspberry Pi using the Google IoT Core commands with the ExecuteStreamCommand processor:

gcloud iot devices commands send \
    --command-data=ICN147321363982450688
    --region=REGION  \
    --registry=REGISTRY_ID \
    --device=DEVICE_ID

In MiNiFi, we are receiving the command and storing the model ID in the cache:

Receiving the model ID through the Google IoT Core MQTT command
Receiving the model ID through the Google IoT Core MQTT command

We can now perform the inference for each picture we capture by making an HTTPS call against the exposed API:

Inference in the cloud using the exposed API
Inference in the cloud using the exposed API

Note the RouteOnAttribute processor that will check in which mode the device is configured. In this case, the device is configured in cloud mode.

The picture payload needs to be base64 encoded and the API to use is:

curl -X POST \
  -H "Authorization: Bearer $(gcloud auth application-default print-access-token)" \
  -H "Content-Type: application/json" \
https://automl.googleapis.com/v1beta1/projects/${PROJECT_ID}/locations/us-central1/models/${MODEL_ID}:predict \
  -d '{
        "payload" : {
          "image": {
            "imageBytes" : "/9j/4AAQSkZJRgABAQAAAQ … "
          }
        }
      }'

Edge exportable model

In this mode, once the TensorFlow model is trained, we just need to make an API call to export the trained model into Google Cloud Storage:

API calls to export the custom edge model into Google Cloud Storage
API calls to export the custom edge model into Google Cloud Storage

The API to use is:

curl \
  -H "Authorization: Bearer $(gcloud auth application-default print-access-token)" \
  -H "Content-Type: application/json" \
  https://${ENDPOINT}/v1beta1/projects/${PROJECT_ID}/locations/us-central1/models/${MODEL_ID}:export \
  -d '{
        "output_config": {
          "model_format": "tflite",
          "gcs_destination": {
              "output_uri_prefix": "${USER_GCS_PATH}"
          }
        }
      }'

In case you want to export a model that is optimised for the Edge TPU (we’ll talk about this in a bit), then you have to use:

curl \
  -H "Authorization: Bearer $(gcloud auth application-default print-access-token)" \
  -H "Content-Type: application/json" \
  https://${ENDPOINT}/v1beta1/projects/${PROJECT_ID}/locations/us-central1/models/${MODEL_ID}:export \
  -d '{
        "output_config": {
          "model_format": "edgetpu_tflite",
          "gcs_destination": {
              "output_uri_prefix": "${USER_GCS_PATH}"
          }
        }
      }'

With this API call, NiFi will receive a JSON payload with the GCS path to the tflite file representing your custom model. Here again we are using a Google Cloud IoT Core command to send this information to our device through MQTT:

gcloud iot devices commands send \
    --command-data=gs://.../model.tflite
    --region=REGION  \
    --registry=REGISTRY_ID \
    --device=DEVICE_ID

On MiNiFi, we are receiving this information and downloading the model directly on the device so we can do local inference:

Receiving the GCS path of the model through MQTT and downloading the model locally
Receiving the GCS path of the model through MQTT and downloading the model locally

Now we have the TensorFlow lite model downloaded on the Raspberry Pi, we can perform the inference on every captured image:

Inference on the edge using the downloaded TensorFlow lite model
Inference on the edge using the downloaded TensorFlow lite model

To execute the model locally, I’m executing a Python script that you can find here.

Boosting your device with the Coral Edge TPU

Since the 22nd of October 2019, Coral is now GA! Coral is a hardware and software platform for building intelligent devices with fast neural network inferencing.

At the heart of the devices is the Edge TPU coprocessor. This is a small ASIC built by Google that’s specially-designed to execute state-of-the-art neural networks at high speed, with a low power cost. The Edge TPU is capable of performing 4 trillion operations (tera-operations) per second (TOPS), using 0.5 watts for each TOPS (2 TOPS per watt).

Edge TPU benchmark by Google
Edge TPU benchmark by Google

In my use case, I’m using the USB accelerator that I’m directly plugging to the Raspberry Pi.

Coral USB accelerator
Coral USB accelerator

Using the custom ML models I trained on Google Cloud Vision, I can easily compare the TensorFlow “invoke” time when using or not the Edge TPU:

  • When not using the Edge TPU optimised model:
TF Lite invoke time when not using the Edge TPU — about 127ms/image
TF Lite invoke time when not using the Edge TPU — about 127ms/image
  • When using the Edge TPU and the optimised model:
TF Lite invoke time when using the Edge TPU — about 9ms/image
TF Lite invoke time when using the Edge TPU — about 9ms/image

We get about 127ms without using the Edge TPU while we only need 9ms when using the Edge TPU on a low latency optimised model generated for my cookies use case with Google Cloud Vision AutoML.

Preliminary results

So far we have sent the pictures through MQTT and performed the classification inference into two distinct branches of the MiNiFi workflow (in other words: pictures ingestion and pictures inference are done in parallel). Here are the results we get (processing time is the duration between the moment we take the picture and the moment we get the label and the confidence score of the classification ; inference time is the duration of the “invoke” call to get the classification using the TensorFlow Lite model):

  • cloud mode (inference in the cloud through HTTPS):
    – Processing time: about 6 seconds per image
    – Inference time: about 2.5 seconds per image
  • edge mode (without the Edge TPU):
    – Processing time: about 750 milliseconds per image
    – Inference time: about 127 milliseconds per image
  • edge mode + TPU(with the Edge TPU):
    – Processing time: about 500 milliseconds per image
    – Inference time: about 9 milliseconds per image

Monitoring and auto-labelling

What we have done so far is great but, once we made the inference on the Raspberry Pi, we take the action to keep or not the cookie on the production line (in my case, I switch on and off the red or green lights) and we forget about the inference results. The next step is to perform the inference before sending the pictures through MQTT so that we can send the inference results along with the pictures. Pictures inference and pictures ingestion are done sequentially. This has two main key benefits:

  • We can collect additional data to deploy monitoring dashboards in order to have accurate information about how our custom models are performing on our devices
  • We can introduce auto-labelling of the images: if the confidence score of the inferred label is over, for example, 90%, we could auto-label the image when ingesting it into the Google Cloud Vision dataset so that minimal human interaction is required when it comes to labelling: only the images representing new types of defects might need human labelling.

I won’t go into the details of the required changes in the workflows but on MiNiFi it looks like this:

MiNiFi workflow when sending inference results along with pictures through MQTT
MiNiFi workflow when sending inference results along with pictures through MQTT

Once we receive all the information in NiFi, we can add this information into Stackdriver monitoring dashboards and see in real-time how our custom models are performing:

Processing time and score for cloud mode versus edge mode
Processing time and score for cloud mode versus edge mode

In the above picture we can see how the processing time and the confidence score is changing while going from the cloud mode to the edge mode after updating the device configuration. We clearly see that the processing is much faster but we are losing a little bit in accuracy (still above 90% though!).

We can also compare the results when using the Edge TPU or not while the device is configured in edge mode:

Processing time and score for edge mode with and without Edge TPU
Processing time and score for edge mode with and without Edge TPU

In the above picture,we can see an interesting result: when using the Edge TPU with the Edge TPU optimised model, we get a lower processing time while also getting a higher confidence score for the inference. No reason not to use it!

Conclusion

First, thank you for reading this very long story… Then, here is the conclusion of it: thanks to Apache NiFi, MiNiFi, the products of the Google Cloud Platform and the Coral products, we implemented an efficient end-to-end solution to run custom ML models on the edge while continuously refresh our models with new data to improve the models accuracy over time with no code and minimal human interaction. I’ll leave it to your imagination to transpose all of this to your very own use case!

The slides of the talk, the recording as well as code snippets, custom processors, workflows, scripts, etc, will be added in the coming days on this Github repository. As usual, feel free to comment/ask questions.

NiFi with OIDC using Terraform on the Google Cloud Platform

When I present Apache NiFi during talks or meetings, I have to quickly start and stop instances. It’s very easy to do it on your own laptop with Docker, but it’s even better to have it running in the cloud and use IAC (Infrastructure As Code).

It’s very easy to start Apache NiFi on the Google Cloud Platform in a Compute instance, expose it on the Internet and have everything running. It just takes two commands and few seconds… Go in your GCP project, start the Cloud Shell console and run the two below commands:

gcloud beta compute instances create-with-container my-nifi-instance --tags=nifi --container-image=apache/nifi
gcloud compute firewall-rules create allow-nifi-unsecured --action=ALLOW --rules=tcp:8080 --target-tags=nifi

You just started a Compute instance with the latest version of Apache NiFi and exposed it to anyone on the internet. You just need to get the external IP of your instance and you can access the UI on http://external_ip:8080/nifi.

It’s great but you need to understand that your instance is not secured and exposed to anyone. In short… you should never do that. At the very least, get your IP and restrict the access to the instance to your own IP.

But please… Security must be a first class citizen and the Apache NiFi community is really doing an amazing job to give you the best options to secure your instances.


In this post I show you how to use Terraform to start a secured NiFi instance configured to use OpenID Connect (OIDC) for authentication.

Note — I assume you have a domain that you own (you can get one with Google). It will be used to map a domain to the web interface exposed by NiFi. In this post, I use my own domain: pierrevillard.com and will map nifi.pierrevillard.com to my NiFi instance.

Disclaimer — the below steps should not be used for a production instance, I’m just using the below to start a secured instance with a single user access for short demos (there is no configuration that one would expect for a production or long-lived instance).


OAuth Credentials

First step is to create the OAuth Credentials (at this moment, this cannot be done using Terraform). 

Once the credentials are created, you will get a client ID and a client secret that you will need in the Terraform variables.

By creating the credentials, your domain will be automatically added to the list of the “Authorized domains” in the OAuth consent screen configuration. It protects you and your users by ensuring that OAuth authentication is only coming from authorized domains.


Deploy NiFi with Terraform

I’ll go a bit deeper of what I’m doing in the next parts, below are just commands to deploy everything. Go in your GCP Project, and start the Cloud Shell console.

git clone https://github.com/pvillard31/nifi-gcp-terraform.git
cd nifi-gcp-terraform/gcp-single-secured-nifi-oidc/
terraform init
terraform apply

When applying the Terraform configuration, it’ll ask for some information:

  • The token to be used between the NiFi CA and the NiFi instance to generate certificates. You can use a random string which is at least 16 bytes long.
  • The Google email address of the user that will be the initial admin for the NiFi instance.
  • The OAuth Client ID and Secret you got before.
  • And the sub-domain that you will configure and use to access your NiFi instance. In my case: nifi.pierrevillard.com.

Access NiFi

Once the Terraform configuration is applied. You need to map your subdomain to the static IP of the NiFi instance. Go on your GCP Project, on the Compute Engine page and get the external IP of the NiFi instance:

Once you have the external IP, go to your DNS provider page and add a ‘A’ record to your DNS records with the subdomain pointing to the external IP. The exact steps depend on your DNS provider.

Once done, you should be able to access the NiFi UI using your subdomain on the port 8443: https://nifi.pierrevillard.com:8443/nifi.

You will most probably get a warning from your browser because of the untrusted certificate authority. That’s because we generated a CA certificate to sign the NiFi certificate. You can ignore the warning for a demo but otherwise you should use a trusted CA certificate:

Once you proceed to the website, you’ll be redirected to the Google authentication page asking for your credentials. That’s because we configured NiFi to use OpenID Connect to delegate the authentication to Google. At this point, you can only authenticate using the Google address you provided as initial admin for NiFi:

Then you are connected as the user and can access the canvas:

Before being able to design you first workflow, you’ll need to go to the “Policies” menu to grant you the required permissions. You can also go to the “Users” menu to add additional users that will be able to authenticate on the UI and give them the appropriate permissions. 


Details

The Terraform configuration files are on Github:

  • provider.tf to define the Google Cloud provider with the basic GCP project information
  • network.tf to create a network and subnetwork dedicated to the NiFi CA instance and the NiFi instance
  • firewall.tf to create the rules to allow internal communications, SSH access to the instances and access to the NiFi instance on the port 8443
  • nifi-ca.tf to install the NiFi Certificate Authority (provided with the TLS toolkit) in server mode in order to create a certificate authority and sign the certificates for the NiFi instance
  • nifi.tf to install the NiFi instance, get the certificate from the NiFi CA, generate the keystore and truststore and configure the NiFi instance to be secured and use OpenID Connect for authentication
  • variables.tf to define some variables to be used to customize the deployment (more variables could be added)

Remember, this is a basic deployment of NiFi but you have a secured instance with Google delegated authentication.

Note — To delete all the created resources, you can use ‘terraform destroy’.

There is much more to do to get closer to a production ready deployment but it gets you started to play with NiFi on the Google Cloud Platform. I’ll add more features in my next posts (the immediate next step will be to add a secured NiFi Registry instance that is connected to the Google Cloud Source Repositories).

Thanks for reading, feel free to ask questions or comment this post!

NiFi & NiFi Registry on the Google Cloud Platform with Cloud Source Repositories


This post is about quickly and easily deploying an unsecured instance of NiFi and an unsecured instance of the NiFi Registry which uses the Cloud Source Repositories service as backend for the flow persistence provider.

The objective is to quickly deploy NiFi and the NiFi Registry, connect the two together, version the workflows in the Source Repositories, and be up and running quickly to start building workflows. This is not suitable for production deployment as we are not securing the instances (I’ll talk about that in another post).

Also this story is about a new feature in NiFi Registry 0.4.0 (NIFIREG-209) which allows the NiFi Registry to rebuild all the metadata from an existing Git repository of flows. It’s a very nice feature when you start and stop NiFi instances on the fly while also having access to your versioned flows very easily. Actually, using this feature, we could run the NiFi Registry in Google Cloud Run and have the production instances of NiFi just pulling the versions of the flows from the NiFi Registry exposed by Google Cloud Run. By doing that you would leverage the advantages of serverless. If you are interested by Google Cloud Run, you might be interested about this post for running NiFi workflows in Cloud Run.


Setup Source Repository

I start creating a fresh new project in my Google Cloud Platform console. I call this new project ‘nifi-registry’. Once the project is created, I go into Source Repositories. If it’s your first time, click on ‘Get started’ and ‘Create repository’.

Source Repositories is the Google Cloud offer to get free unlimited private Git repositories to organize your code in a way that works best for you (you can also mirror code from GitHub or Bitbucket repositories to get powerful code search, code browsing, and diagnostics capabilities). It also nicely integrates with CI/CD tools.

In my case, I create a new repository that I call ‘nifi-flow-repository’.

Let’s now setup the SSH key to allow access to the repository.

$ ssh-keygen -t rsa -b 4096 -m PEM -C "NiFi Registry"

Generating public/private rsa key pair.
Enter file in which to save the key (~/.ssh/id_rsa): ~/temp/id_rsa
Enter passphrase (empty for no passphrase): 
Enter same passphrase again: 
Your identification has been saved in ~/temp/id_rsa.
Your public key has been saved in ~/temp/id_rsa.pub.
The key fingerprint is:
SHA256:/hu6FZLvcvDigP4ixPFPEGzfpY1RHOoTRytjqYOoYHo NiFi Registry
The key's randomart image is:
+---[RSA 4096]----+
|   .     o+.     |
|    +   .+o.     |
|   . o .B*o      |
|  .....++*.      |
|.o.o..o S .      |
|+.o ...o.+ .     |
|o.E .o. .o+      |
| .... ..o=o.     |
|   ..o..+=+.     |
+----[SHA256]-----+


$ ls
id_rsa     id_rsa.pub

For this demo to work, we generate a PEM encoded key and we use an empty passphrase (again, this is not ideal for production). Once done, you can register the SSH key with Google Cloud (there is a link available after you created the repository). You just have to give a name to the key and copy the content of the generated id_rsa.pub file.

Start the NiFi Registry

We can now focus on starting the NiFi Registry. To be up and running very quickly, I’m going to rely on the Docker image provided by Apache NiFi and use it in a simple Compute Engine instance with Docker enabled.

In Compute Engine / Instance templates, you can create a new template. Here is my setup with the parameters I changed (adapt it to your needs):

  • Name : nifi-registry
  • Check “Deploy a container image to this VM instance”
  • Container image : apache/nifi-registry
  • Go into “Advanced container options”
  • In the volume mounts (use Directory as volume type) :
  • Go into “Management, security, disks, networking, sole tenancy”
  • Add the below startup script :
#! /bin/bash

# This script is used when starting a docker image based GCE instance
# of the NiFi Registry. It is intended to configure the NiFi Registry
# so that the persistence provider is the Google Cloud Source Repo.

# Docker volumes (directory type)
# /tmp/config => /home/nifi/.ssh - Read only
# /tmp/ssh/id_rsa => /id_rsa - Read only
# /tmp/nifi-flow-repository => /nifi-flow-repository - Read/Write
# /tmp/providers.xml => /opt/nifi-registry/nifi-registry-0.4.0/conf/providers.xml - Read only

# Note that 1000 is uid/gid for NiFi user/group in the Docker container

# create directory for SSH keys
mkdir /tmp/ssh

# private SSH key to authenticate againt the Google Cloud Source Repo
# its associated public key needs to be registered on Source Repo
# this is the content of the id_rsa file we generated, change it with yours!
touch /tmp/ssh/id_rsa && chmod 600 /tmp/ssh/id_rsa
cat <<EOF >> /tmp/ssh/id_rsa
-----BEGIN RSA PRIVATE KEY-----
MIIJJwIBAAKCAgEAxRVsCHEUnKJXZnJ8GZVb/D0bvCrzK/p8jYiuleUqCgrqJ+2D
hocKkLqPKU5yiiA11vDCgrn7GBkWUH+Naj6rc9qZviwJtuZFjfKt+gg9++JLsnQn
8TdfbZ7nKZ2fh397Yr50mLP9wJw6A0nd68Y9YowRID64ZJ+kfqtnr5gaU5wI6/j+
UEV8QO2hpQfJ+hI0TUe82kY3l21J4FcLGbKsZSwMjRDRIG9fly4adEVsPq+WZFJ1
6q8ZsGfLpArCORQiM81mBFTeeLn65FZwDEsAIko3Fv+OVBjv+YtpaUOL4QwM9E3X
9v+0WqkKG0MYm12FjHkulIdHbO9gVIXKM8sIyI+J49fxWiLnb+HiYExCvdgaKCAf
eR12zIjTXHrX8YHF5uxfE0oCMgPcZIcsSVO0zjNNKniJy4AWU46hxB8USI7Kf9jD
i/YVRmDi67mXSi6z8FjamccIWg8CnWVkLTr771ERZATZAn7YDY7Zu8Z+AXE6U12o
7hkcB940B+hHnHrB32iKiOev+NLR+NCde1YUBHfaw6vmmsBwrmO9H/cSGz9L1aC/
seSOBrnWef/YZXAXtMb1maSGpVEW+74uR05UL5Bt5xlB+MAQGRcDWkMJ08teqYGZ
IOgqqeESihXq2Suk1+nMhO/kUHdUBHVHXBmQTqtRMKjMyyRaM/rU/BwF0PECAwEA
AQKCAgBEFI1YMS/sl8vXUO77q3O2I6nkC3YrGAFUpwWgNCSciX6vwkVwAFAvBLMV
ksrQWiYfFNYscHCDt47Uyesg63swry/y2KWWb99RFKbyu0wmKdr6T1PI6NbnOEAk
SRXlqa0GxEHkyjB7C7yijC7EFpv34ei8mc36vIcHVtCfgMx/W2Rdl4rKUeaFS1FO
f+1OnGFC3OgpAp6Lia4+d/MWsnkJDflb9ZY4PSDpSXzL83tcAC3UhAPFySz3mjNn
fGvxaboS6P43utWLILvBivZz2Ki4L6juIeOZu2+SZY1JVpMIb7A96HIVGenEc6ZR
GpFyghDzIJ1to3mR8PL236ykzZs/iRbRGOvHSHNFiAmKMIXqeYe+Q6u/9mGVo68J
Vbw0PeTWaaPnnBM3iVi2d3STGsI6x2bTtosDyRS5YLNztj0Y8mOSWQz8JvnTcikM
lKBcSbA3K/ophzSyjk8sN65HqM1JQ0v/bK6w8TtZBLThPwcqRtbvYQBCQC7g74Oe
5KAuJRDTKDSxkLlD9AX0hdMIMIVKNd5h8Sx7vlX6DZ/5PdLT87eIS39mQlXNjE6W
kvGPte1YlTqWFECmxHlxvSEMB6txEzUrfEVLpTdKkpnygQFJfZdKzTiIsFiqKn9s
s6IAuzyHBJ6MMUdstZ8DMMnE/uXWU3ouUemSLcRiPMXi5AlZkQKCAQEA+EuQ75un
9PS/D6pCcOrZaodK0J2Xz4KX2VkuEJeP/KmDAFwcB0HzpQO6zQglIf8Sv9r3/g10
qk6pt5Y+gqBnpQOV/uUgN7U0lTMAnxx4l3RxXKZW3cqEYL326ouzRKDjz7TxYiWK
T+RT4QlK0jm7NfqZ8trqwi4dB4Nz5JA4MItoSinyWYzRfnaNH8jBxYmRyDGWQLsq
y2Dep6AZ2GPez473xl6TdwTc47UnDYbyuXdhK4yV+Jr9quYUkda+gFVlh7ZEK/vi
WsBoh0IjG07C4m8rNdxHnfplH9MIFo+I2u+hk6Ot4hpTc9b+Vj93FidSnboOEjDh
YD6MM1A8gVJdtQKCAQEAyzMJVHOAWq0F14h6vUY9m8lkw3KBZorfTsDD39kG3P7C
lh/VCCqk7Uz6drkPc6Gb/xp1LgCDhdxxNISos/92IINFwTmg1bQpaCBsfgkM+r2g
S87hUdCL3NCLwsrS67oQjalEAVu8TZ2obCZ48EgasiY48tq7SrQ7+tW7PSjnO5pv
pM661m5Gmp0/y7+0CkPg2JyGp4mwlYY5PgHUckpjwl7dHiDqTSYypOH0rX8i0UF7
NlOpWIFK1/o3K5YzenJwLIm9tdhG+AJ4ZXUOL+ihoUp+uyac33jCAsg34n2spEgi
F/5V8SZ/oYImBeqZJBSf/eOllAVQMHoNyU3F7SILzQKCAQAul6+I5wKresnnnKF/
IvLNhLnLT+dO70ijZpK4VliUpxKIHMC9s+iOjJafJuog6QaRrftfVxMPald6tdzh
EkygsH2TKVfUXFKTtNBnCyat9RHYuvYOBJS2uq788F4hgLd/IIszSawctdHvppi9
vkudI3uEEQSAj7qu0EINH+sLYP2e/SQXHc+qFYEB9+A0u0357SQu3XB4XaMqfWac
LpF/DWr9dH3jlawFpta/ORWPLBG15Fm4Hw1+5lHx1ARHfL7iqpc8UbX2Jaj3yLdh
xnPXndjT8JQX1wbm4+jeouyheNovJEXa6enDERMFCD/GjnZ4VpORYk7IirQwZNwV
wGkJAoIBACYlU5gOAseC+bjHfzsvB3vKZ+clBNPKODehimPoaxhrnv3txeE9mC6Q
J+jHvvXXHeDbB6p2IDqt8naIfN8lkvhxjFPEzMOxiaBpjdRvQIeFt6Tjlnr0an0u
jT4pM0VbbaYaY5DZttTfRvHemw+IibJt6Hz2wPg6M5RYMUk+94HB8TmAMaT1mL2H
zaUjPNo8eeZQJBsphwPa6b4RO9+kxWuEwX/ZcALBq/o2DOfRGSktYMMHG6BozTMU
Xu0IymsvNo214e5URqZiWFW7jslBo64SvQ3HJuXw1oMNMSiMrS8992CHt3yI3Kbm
NtsfelZCpPJVnQzXnoErOJFUz1Y/8PUCggEASI2Uky/TDLCU5Inm2hN1zzozF/kh
pvKks4Kd93BSiMWDHg07HtfkGPBINQdwGScLuTK63uHEuVTB1e1VM+Kd+FqoddoA
xEfdFKoDMgyhE2KNg9iiePbkeXzCCY9yFD2xQ+zfMdqd/755Fg356IfETSx59WzO
vNTQShrwj5zOE1H3wl+YYsAWA1AiHslzwVp5pO26KhQLMO90q5j2g7gJEWOfhUDi
bkhJu3rXdJpiYOCngy0vKiSFYrvFfYh4hZ4+mL/TzWIuJtC0zmSetk1VwOYBkwJZ
Uo1bAaMdFJMC/QAWiJv95jGuusV39nT7pxd93sW1Iiv9OXnsxj8OeShEVg==
-----END RSA PRIVATE KEY-----
EOF

# clone the Google Cloud Source Repository
cd /tmp
ssh-agent bash -c 'ssh-add /tmp/ssh/id_rsa; git clone ssh://admin@pierrevillard.com@source.developers.google.com:2022/p/nifi-registry-245014/r/nifi-flow-repository'
chmod 755 /tmp/nifi-flow-repository
chown -R 1000:1000 /tmp/nifi-flow-repository

# Create the providers configuration for the NiFi Registry
# no user/password because we use SSH authentication
cat <<EOF > /tmp/providers.xml
<providers>
  <flowPersistenceProvider>
    <class>org.apache.nifi.registry.provider.flow.git.GitFlowPersistenceProvider</class>
    <property name="Flow Storage Directory">/nifi-flow-repository</property>
    <property name="Remote To Push">origin</property>
    <property name="Remote Access User"></property>
    <property name="Remote Access Password"></property>
  </flowPersistenceProvider>
  <extensionBundlePersistenceProvider>
    <class>org.apache.nifi.registry.provider.extension.FileSystemBundlePersistenceProvider</class>
    <property name="Extension Bundle Storage Directory">./extension_bundles</property>
  </extensionBundlePersistenceProvider>
</providers>
EOF
chown 1000:1000 /tmp/providers.xml

# .ssh/config and .ssh/known_hosts files
mkdir /tmp/config
cat <<EOF >> /tmp/config/config
Host source.developers.google.com
  HostName source.developers.google.com
  IdentityFile /id_rsa
EOF
ssh-keyscan -p 2022 source.developers.google.com >> /tmp/config/known_hosts
chown -R 1000:1000 /tmp/config

# change chmod on the private key to allow access to 'nifi' user inside container
chown 1000:1000 /tmp/ssh/id_rsa

Note 1 — we are using templates to get up and running very quickly each time you want to start a new instance with the same configuration.

Note 2 — the above approach is not recommended as we are copying/pasting the private key in the startup script but this is due to the restrictions coming with the Container Optimized OS used for this demo. In a better world, we would use Cloud Build to have our own NiFi Registry image and use it instead. Or we could deploy the public image on Google Kubernetes Engine and use secrets.

Once your template is created you can open it and click “Create VM”:

Then you can give a name to your instance (let’s say ‘nifi-registry’) and start it. You should have an instance up and running:

After configuring the proper Firewall rule to allow access from your personal network to the instance on the port 18080, you should be able to access the NiFi Registry at http://<external IP>:18080/nifi-registry :

You can go in Settings (top right) and create a new bucket:

You now have a NiFi Registry up and running and you have initialized you first bucket. We can now deploy a NiFi instance, connect it with the Registry and create out first workflow.

Start a standalone NiFi instance

It’s very easy! Just go in Compute Engine / VM instances and click “Create instance”. Then just give a proper name to your instance and configure it to use the NiFi Docker image:

Start your VM and wait for few minutes. After configuring the Firewall rule to allow access from your personal network, you should be able to access NiFi on port 8080:

Go into the top-right hamburger menu and go into Controller Settings. Then go into the Registry Clients tab and click the + button to configure your registry:

You can now add a Process Group into the canvas, right click on it and start versioning:

You will notice that we can see the bucket we created in the registry. We can give a name to our workflow, a description, and a commit for this version.

Once we click save, we have the confirmation that the workflow has been correctly versioned:

We can check in our Cloud Source Repository that we do have data:

That’s it. You can now create a more complex workflow and commit the new version into the Registry, this will be saved into your repository. Even better, if you kill your NiFi Registry instance, and start a new one, you will be able to keep working and pull all the workflows you previously stored in the repository — all the metadata will be generated from the repository data at startup.

There is much more to come about NiFi on Google Cloud, stay tuned! Thanks for reading and feel free to comment and/or ask questions.

Deploying NiFi workflows on Google Cloud Run

Google Cloud just launched a new managed service call Cloud Run which is currently in public beta.

Cloud Run is a managed compute platform that enables you to run stateless containers that are invocable via HTTP requests. Cloud Run is serverless: it abstracts away all infrastructure management, so you can focus on what matters most — building great applications. It is built from Knative and let you run your containers fully managed with Cloud Run.

The huge benefit of this approach is: if there is no call to your service, there won’t be any container running and you won’t be charged. When a call is received, the container is instantiated to serve the request. If there are new requests, the container remains alive until there is no more request to answer. Besides, in case of traffic increase, your app will scale and more containers will be instantiated.

Also, when deploying a new version (revision) of your service, requests are automatically routed as soon as possible to the latest healthy service revision. This makes the whole CI/CD and service lifecycle much easier.

I wanted to play with this new service using NiFi: the idea is to have a workflow starting with HandleHttpRequest processor and ending with HandleHttpResponse processor to easily develop “functions” and deploy the web services in Cloud Run.

Note – everything used for this demo is available on this Github repo.

The workflow

Here is a simple workflow I developed and deployed on Cloud Run: this is a workflow expecting CSV data as input and converting the CSV data into JSON.

I want to emphasize that this is a very simple case. But think about all the great features you can use in NiFi following this approach. Possible use cases are pretty much infinite.

Basically, once the workflow is running I can use a POST HTTP request to get my JSON:

$ cat test.csv
name,company
Pierre Villard,Google

$ curl --data-binary "@test.csv" -H "Content-Type: text/csv" -X POST http://localhost:9090/
[ {
  "name" : "Pierre Villard",
  "company" : "Google"
} ]

This is a very simple workflow:

  • HandleHttpRequest – will start listening on a given port for HTTP(S) requests
  • ConvertRecord – will convert my CSV data into JSON using the header to infer the schema
  • HandleHttpResponse – return the result to the user
Simple workflow to convert CSV to JSON using a HTTP endpoint

Considerations for the container

The container must be stateless and listen for HTTP requests on $PORT that will be passed to the container as an environment variable. Have a look at the container runtime contract.

In the NiFi world, that means that the property defining the listening port in the HandleHttpRequest processor must accept Expression Language. Unfortunately, that is not the case and I submitted a fix with NIFI-6144. However, in Cloud Run container instances, the PORT environment variable is always set to 8080 (but for portability reasons, your code should not hardcode this value). This means, you don’t strictly need the fix I submitted to play with Cloud Run and NiFi, but it’s safer if you do.

The container, when started, must be able to serve the request within 4 minutes which means that we want a container able to start as quick as possible and with a low footprint. To do that, I chose to work with the MiNiFi Docker image instead of the NiFi one.

Converting the workflow for MiNiFi

Once the workflow is developed, you can save it as a template (XML file) and use the MiNiFi toolkit to convert the XML file into a YAML file that will be useable by MiNiFi.

# command to convert the template into yml file for MiNiFi
$MINIFI_HOME/bin/config.sh transform templateConvertRecord.xml config.yml

You can have a look at the YAML file on the Github repository.

Building the container

By default, the MiNiFi Docker image comes with a limited set of features and it’s up to you to add the elements you need to run your workflow if required. In my case, I added:

  • a custom build of the standard processors NAR to include my fix,
  • the HTTP context NAR that is required when using HandleHttpRequest/Response processors,
  • and the Record serialization NAR required for the ConvertRecord processor.

Here is my Dockerfile:

FROM apache/nifi-minifi:latest
USER root

ENV MINIFI_HOME /opt/minifi/minifi-0.5.0

ADD config.yml $MINIFI_HOME/conf/config.yml
ADD *.nar $MINIFI_HOME/lib/

RUN chown -R minifi:minifi $MINIFI_HOME

USER minifi

You then need to create your image and push it to the Container Registry service in the Google Cloud Platform. You can have a look at the documentation here.

git clone git@github.com:pvillard31/nifi-cloudrun-example.git
cd nifi-cloudrun-example
gcloud auth configure-docker
# docker build -t [HOSTNAME]/[PROJECT-ID]/[IMAGE]:[TAG] .
docker build -t eu.gcr.io/my-gcp-project/minifi-csvjson:0.0.1 .
# docker push [HOSTNAME]/[PROJECT-ID]/[IMAGE]:[TAG] .
docker push eu.gcr.io/my-gcp-project/minifi-csvjson:0.0.1

Deploy in Cloud Run

Once your Docker image is pushed in the Container Registry, you can go to the Cloud Run service and deploy your service!

Click on Create Service and fill the information:

Create a service in Cloud Run

Once your service is deployed, you will get a service HTTPS URL exposing your service:

I can now directly send requests to the exposed service:

$ time curl --data-binary "@test.csv" -H "Content-Type: text/csv" -X POST https://minifi-csvjson-t546z2l6aq-uc.a.run.app
[ {
  "name" : "Pierre Villard",
  "company" : "Google"
} ]
real	0m0.725s
user	0m0.036s
sys	0m0.026s

And I can also monitor my service in Stackdriver:

Conclusion

That’s it! You know how to deploy a new service in Cloud Run using the features of NiFi to develop your workflow and using the MiNiFi Docker image for running your containers.

Hopefully this demonstrates how easy it is to create and deploy a service and everything is managed for you in terms of scaling and lifecycle… and you’re paying only for what you really use!

In combination with the NiFi Registry and all the tools you have on the Google Cloud Platform you can have a very neat pipeline to automate all of it and deploy new services in seconds!

As always, feel free to ask questions and/or leave a comment.

NiFi 1.8+ – Revolutionizing the List/Fetch pattern and more…

If you read my post about List/Fetch pattern and if you’re using this approach for some of your workflows, this new feature coming with NiFi 1.8.0 is going to be a revolution.

A quick recap about the context: in NiFi, unless you specifically do something to make the nodes of a NiFi cluster exchange data, a flow file will remain on the same node from the beginning to the end of the workflow. For some use cases, it is necessary to load-balance the flow files among the nodes of the cluster to take advantage of all the nodes (like the List/Fetch pattern).

The only way to load balance data in a NiFi cluster before NiFi 1.8 is to use the Site-to-Site (S2S) protocol in NiFi with a Remote Process Group (RPG) connected to the cluster itself with an input/output port. In addition to that the S2S forces you to have the input/output port defined at the root level of the canvas.  In a multi-tenant environment this can be a bit annoying and can make the workflows a little bit more complex.

What’s the revolution with NiFi 1.8+? For intra-cluster load balancing of the flow files, you now can do it directly by configuring it on the relationship between two processors. It could sound like a minor thing, but that’s HUGE! In addition to that, you have few options to configure the load-balancing which opens up new possibilities for new use cases!

The List/Fetch pattern is described in my previous post: in short… it’s the action to use a first processor (ListX) only running on the primary node to list the available data on X and generate one flow file per object to retrieve on X (the flow file does not have any content but contains the metadata to be used to fetch the object), then flow files are distributed among the NiFi nodes and then the FetchX processor running on all nodes will take care of actually retrieving the data. This way you ensure there is no concurrent access to the same object and you distribute the work in your cluster.

List/Fetch pattern before NiFi 1.8.0

If we have a project A retrieving data from a FTP server using the List/Fetch pattern to push the data into HDFS, it’d look like this:

  • Root Process Group level

Screen Shot 2018-10-18 at 10.03.23 PM.png

  • Inside the Process Group dedicated to Project A

Screen Shot 2018-10-18 at 10.04.01 PM.png

The ListFTP is running on the primary node and sends the data to the RPG which load balances the flow files among the nodes. Flow files are pushed to the input port at the root level and the data can then be moved back down to the process group of the project. Then the FetchFTP actually retrieves the data and the data is sent to HDFS.

List/Fetch pattern with NiFi 1.8+

Now… it looks like this:

  • Root Process Group level

Screen Shot 2018-10-18 at 10.09.29 PM

  • Inside the Process Group dedicated to Project A

Screen Shot 2018-10-18 at 10.10.30 PM

It’s crazy, no? You don’t have anything outside of the process group anymore, the workflow is cleaner/simpler, and authorizations are much easier to manage.

Where is the trick? Did you notice the small icon on the relationship between the ListFTP and the FetchFTP? It looks small but it’s HUGE :).

Load balancing strategies in NiFi 1.8+

Let’s have a look at the configuration of a connection:

Screen Shot 2018-10-18 at 10.18.24 PM.png

There is a new parameter available: the load balance strategy. By default it defaults to “do not load balance” and, unless you need to, you won’t change that parameter (you don’t want to move data between your nodes at each step of the workflow if there is no reason to do so).

Here are the available strategies:

Screen Shot 2018-10-18 at 10.20.24 PM

The Round robin strategy is the one you would probably use in a List/Fetch use case. It will ensure your data is evenly balanced between your nodes.

The Single node strategy allows you to send the data back to one single node. You can see it a bit like a reducer in a MapReduce job: you process the data on all the nodes and then you want to perform a step on a single node. One example could be: I have a zip file containing hundreds of files, I unzip the file on one node, load balance all the flow files (using Round Robin strategy for example) among the nodes, process the files on all the nodes and then send back the flow files to a single node to compress back the data into a single file. It could look like this:

Screen Shot 2018-10-18 at 10.31.08 PM.png

Then you have the Partition by attribute strategy allowing you to have all the flow files sharing the same value for an attribute to be sent on the same node. For example, let’s say you receive data on NiFi behind a load balancer, you might want to have all the data coming from a given group of sources on the same node.

I won’t go into much more details, but feel free to have a look at the documentation. Besides, I’m sure other members of the Apache NiFi community will publish blog posts on this subject… such as this excellent one!

Let’s just give it a try with each strategy… I’m using a GenerateFlowFile (GFF) connected to an UpdateAttribute and we will list the flow files queuing in the relationship to check where the flow files are located. Besides, the GenerateFlowFile is configured to set a random integer between 2 and 4 for the attribute ‘filename’.

Let’s start with a GFF running on the primary node only with no load balancing strategy:

Screen Shot 2018-10-18 at 10.44.06 PM.png

My primary node being my node2 (I have node2, node3 and node4 in my cluster):

Screen Shot 2018-10-18 at 10.45.05 PM.png

I can confirm all the flow files are on the primary node:

Screen Shot 2018-10-18 at 10.46.37 PM.png

Let’s change the Load Balance strategy to Round Robin. We can confirm the data is evenly distributed:

Screen Shot 2018-10-18 at 10.50.02 PM

Let’s change the strategy to One single node. We can confirm the data is now back to a single node (not necessarily the primary node):

Screen Shot 2018-10-18 at 10.53.22 PM.png

And now let’s try the partitioning by attribute using the ‘filename’ attribute. We can confirm that all the flow files sharing the same value in the Filename column are on the same node:

Screen Shot 2018-10-18 at 10.55.21 PM.png

Screen Shot 2018-10-18 at 10.56.12 PM.png

Again, I expect to see additional blog posts on this subject with more technical insights on how it actually works and what you should consider in case your cluster is scaling up and down.

Also… this new feature comes with another one which is as much exciting: the offloading of nodes in a cluster (look at the Admin guide in the documentation). In case you want to decommission a node in a cluster, this will take care of getting back the data on the other nodes before you actually remove the node for good. This will be particularly useful when deploying NiFi on Kubernetes and scaling up and down your cluster!

As always, feel free to comment and/or ask questions! Thanks for reading!

Monitoring Driven Development with NiFi 1.7+

I already discussed the Monitoring Driven Development (MDD) approach in this post and already covered some monitoring topics around NiFi in few posts (reading the posts would probably help you for reading this one – or at least refresh your memory on some concepts). In this post I want to present once again the MDD approach with some new features of NiFi 1.7 making things a bit easier and cleaner.

What do I mean when talking about MDD in NiFi? When I say MDD, I refer to the fact that, when a team is developing a “core workflow” for a use case, the team should also develop a “monitoring workflow” that will be in charge of monitoring and reporting the health of the “core workflow” based on their business requirements.

How does this work in NiFi? This approach completely relies on two things:

  • A proper naming convention for the components used in the workflows
  • The use of the Site-to-Site Reporting Tasks (see here)

Basically, the idea is to setup the S2S reporting tasks to send data to an input port dedicated to monitoring, then basic routing and filtering is performed assuming projects are following the naming convention so that all the data related to a given “core workflow” is automatically routed to the associated “monitoring workflow”. This way, it’s up to each project to implement the monitoring they want for their use case based on the requirements and SLA they have.

What’s new in NiFi that’s going to help me with MDD?

There are quite few interesting JIRAs but the two most important ones are:

  • [NIFI-4814] – Add distinctive attribute to S2S reporting tasks (NiFi 1.6.0)
  • [NIFI-5122] – Add record writer to S2S Reporting Tasks (NiFi 1.7.0)

Example

For my example, the naming convention is the following: any component that should be monitored should be named with a prefix ABC_ where ABC is the three letters abbreviation representing the use case/project.

I have two projects, one named PJA and one named PJB.

Let’s say the project A is using HandleHttpRequest/Response to receive XML data over HTTP and has two monitoring requirements:

  • Send an email to the team in case a bulletin is generated
  • Monitor in Grafana the amount of valid/invalid data being processed

Let’s say the project B is using ListenTCPRecord for a use case around logs ingestion and has one monitoring requirement:

  • Send an email to the team in case back pressure is enabled

The development of the workflows is outside the scope of this post. I’ll keep things simple as the objective is to demonstrate the MDD approach, not how to implement the workflows of the specific use cases I chose.

A *very basic* implementation of the “core workflow” of PJA could be:

Screen Shot 2018-08-29 at 3.55.19 PM.png

A *very basic* implementation of the “core workflow” of PJB could be:

Screen Shot 2018-08-29 at 4.36.29 PM

Now I’m going to configure the S2S Reporting Tasks – one for Controller Status data and one for Bulletins data. Note that there is a new feature allowing users to select a Record Writer for the generated data. I’m going to use the Avro one for the reporting tasks so that I don’t have to care about the schemas at all. For that I need to create a Controller Service (in Controller Settings menu since this controller service is exclusively used by reporting tasks).

Screen Shot 2018-08-29 at 5.02.26 PM.png

After enabling the CS, I can configure my reporting tasks.

The one for bulletins:

Screen Shot 2018-08-29 at 5.03.59 PM

And the one for controller status:

Screen Shot 2018-08-29 at 5.05.21 PM

Note that you have additional S2S Reporting Tasks if needed. The one about provenance events can be particularly useful for some monitoring requirements such as latency.

At the root level of my canvas, I’ve something looking like:

Screen Shot 2018-08-29 at 5.45.38 PM.png

I’m receiving all the monitoring data sent by the reporting tasks in the ‘monitoring’ input port. Then in my ‘Monitoring’ process group, I have something looking like:

Screen Shot 2018-08-29 at 5.46.48 PM.png

You can notice I’m sending all the data to a ‘Technical Monitoring’. That’s because, the team in charge of administrating/monitoring the platform probably wants to also have a look at the monitoring data to ensure the full system is performing OK (common examples: send the bulletins to an external tool like ES/Kibana, send email alerts in case back pressure is enabled on a connection, etc).

On the upper part of the workflow, that’s where I’m processing the monitoring data to split it and route the monitoring data by project. First I’m using a RouteOnAttribute to route the data based on the type (bulletins, controller status, etc):

Screen Shot 2018-08-29 at 5.50.48 PM

Then I’m using an UpdateRecord processor to add a ‘project’ field that I’m computing based on the component name.

For bulletins:

Screen Shot 2018-08-29 at 5.55.34 PM

For controller status:

Screen Shot 2018-08-29 at 5.52.43 PM.png

Since the data has been sent in Avro format by the reporting task, I’m using an Avro reader Controller Service with default settings. And, from now on, I want the data to be in JSON (as it’ll be easier in case I want to send the data via an email or via HTTP). I’m configuring few controller services: one Avro Schema Registry containing the schemas (that you can retrieve in the additional details of the reporting task documentation, see below), one JSON Reader and one JSON Writer.

For instance, to get the schema of the S2S Bulletin Reporting Task, go on the reporting task list:

Screen Shot 2018-08-29 at 5.59.40 PM.png

Click on the documentation icon on the left:

Screen Shot 2018-08-29 at 6.00.21 PM.png

And then click Additional Details:

Screen Shot 2018-08-29 at 6.00.58 PM.png

When adding the schemas in the Avro Schema Registry, use the type of the reporting task as the name of the schema and don’t forget to add a ‘project’ field in the schemas:

Screen Shot 2018-08-29 at 6.02.28 PM.png

Now you can easily reference the schemas to use in the configuration of the JSON Reader/Writer using the expression language because the flow files generated by the reporting tasks have an attribute with the type of the source reporting task.

Example of attributes on a flow file generated by a RT:

Screen Shot 2018-08-29 at 6.28.56 PM.png

JSON Reader:

Screen Shot 2018-08-29 at 6.03.44 PM.png

JSON Writer:

Screen Shot 2018-08-29 at 6.04.13 PM.png

Now I’ve a ‘project’ field in my data with the project name (if the naming convention has been correctly used), then you can use a QueryRecord processor to route the data for each project:

Screen Shot 2018-08-29 at 6.07.17 PM.png

And we can confirm that only the data for project A is routed to the ‘Monitoring Workflow’ for project A. If looking at the data in the relationship routing data for PJA:

Screen Shot 2018-08-29 at 6.08.53 PM.png

Now, when deploying a new project for the first time (ie importing the process group of the “core workflow”), you just need to update the QueryRecord processor to add a dynamic property for the new project as well as deploying the process group for the “monitoring workflow” of the project. In combination with the NiFi Registry it makes things really smooth when you need to update the workflows with a new version.

I’m not going to develop the monitoring workflows for PJA and PJB but it’d only be a matter of few record processors to filter the data (bulletin? back pressure enabled?), and then use a processor to send the data (ElasticSearch, PutEmail, InvokeHttp, etc):

  • “send an email when a bulletin is generated” – RouteOnAttribute to get the bulletins data and PutEmail to send an email
  • “monitor the number of valid/invalid XML files being processed” – filter the controller status statistics on the ‘Connection’ elements and keep only the statistics of the connections after the ValidateRecord processor, this can be easily done with a QueryRecord processor. The numbers can then be sent to an external system such as ElasticSearch for a dashboard in Kibana/Grafana
  • “send an email when back pressure is enabled” – use a QueryRecord processor on the controller status data by querying the field ‘isBackPressureEnabled’ and then use PutEmail processor.

The full workflow of this example is available as a gist here. Note that, since it’s a template, it does not contain the Reporting Tasks configuration. But you should be up and running quite quickly with the above screenshots.

As always, thanks for reading this post, feel free to comment and/or ask questions. In case you have suggestions to make things even better, just let me know and, maybe, I’ll have to publish a new version of this post to present the new features of a next release 😉

NiFi 1.7+ – Terminate threads

One of the new features coming with NiFi 1.7.0 is the possibility to terminate stuck threads directly from the UI. Before this release, when you had a processor getting stuck (like a custom processor with a deadlock) you had no option but to restart NiFi… and that’s not really great in a multi-tenant setup.

Let’s see this new feature with an example: I’m using a GenerateFlowFile and, using the debug mode of my IDE, I’m going to simulate an issue with the thread by adding a break point in the onTrigger() method of the processor.

Screen Shot 2018-07-02 at 2.56.57 PM.png

When the processor is running, we can see the number of threads currently used by the processor (top right). If the number of tasks is not increasing (and equal to 0 after 5 minutes) and the number of threads is constant, it probably means you have a stuck thread. In this case, it is always recommended to do a thread dump of NiFi to see what is going on. To do that:

./bin/nifi.sh dump /tmp/thread-dump.txt

If we look at the content of the generated file and looking for my GenerateFlowFile processor, we can see something like:

"Timer-Driven Process Thread-5" Id=57 RUNNABLE (suspended)
 at org.apache.nifi.processors.standard.GenerateFlowFile.onTrigger(GenerateFlowFile.java:210)
 at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
 at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1165)
 at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:203)
 at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
 at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
 at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
 at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745)
 Number of Locked Synchronizers: 1
 - java.util.concurrent.ThreadPoolExecutor$Worker@2388307

In this case, the thread is shown as “suspended” at the line where I put my break point in my IDE.

Note that having a stuck thread is usually a symptom of a badly designed processor or an underlying issue with the code dependencies of the processor. Having the thread dump can help locating and fixing the issue.

If trying to stop the processor:

Screen Shot 2018-07-02 at 3.02.23 PM.png

I’ll now see two threads being used by the processor:

Screen Shot 2018-07-02 at 3.03.09 PM

The processor is now in the process of being stopped but you won’t be able to update or restart the processor until it is actually stopped. However, if the initial thread is stuck, you’ll remain in this state forever (and, eventually, you’ll have to restart NiFi if you’re running a version below 1.7.0).

With NiFi 1.7.0, you now have the possibility to terminate the thread (meaning you don’t have to wait for the thread to be completely stopped):

Screen Shot 2018-07-02 at 3.06.58 PM

If I terminate the thread, here is what I’ll see:

Screen Shot 2018-07-02 at 3.08.07 PM

Meaning there is no more active thread and there is one thread being terminated.

Under the hood, the framework is issuing an “interrupt” for the thread and will perform a “reload” of the processor meaning there is a new instance of the processor class being created. This allows the old class to eventually shut down gracefully, close connections, etc. However this needs to be used with care: if the class is maintaining some values in internal variables, this information will be lost in the process (I’m not talking about state information that can be saved at framework level by the processors).

An interrupt is an indication to a thread that it should stop what it is doing and do something else. It’s up to the programmer to decide exactly how a thread responds to an interrupt, but it is very common for the thread to terminate. However if, for some reasons, the thread does not respond to interrupt, then you will keep the thread as being terminated forever (in my above example, I can see the thread staying in the state of being terminated because of my break point).

You can now update the configuration and start again the processor (even if a thread is being terminated). Once the thread is terminated, you’ll get back to a nominal situation:

Screen Shot 2018-07-02 at 4.54.51 PM.png

If you don’t want to setup a debug environment, you can simulate a stuck thread with the following ExecuteScript processor:

Screen Shot 2018-07-02 at 4.56.49 PM.png

If starting/stopping the processor:

Screen Shot 2018-07-02 at 6.02.37 PM.png

And I’ll get the following message when terminating the thread:

Screen Shot 2018-07-02 at 6.02.58 PM.png

You also can use the DebugFlow processor with “@OnStopped Pause Time” set to something like “5 min” and “Ignore Interrupts When Paused” set to “true”:

Screen Shot 2018-07-02 at 6.49.29 PM.png

That’s it for this post! This new feature is really nice in a multi-tenant environment where you can’t afford a restart of the service. If you’re in a situation where you need to use this feature, remember to take a thread dump before actually terminating the thread. This will be really useful to investigate the issue. Also, remember that terminating a thread is not a “normal” operation, it means there is something wrong somewhere and it could very likely happen again.

Thanks for reading this post and, as always, feel free to comment / ask questions!

NiFi 1.7+ – XML Reader/Writer and ForkRecord processor

Starting with NiFi 1.7.0 and thanks to the work done by Johannes Peter on NIFI-4185 and NIFI-5113, it’s now possible to use an XML reader and writer in the Record processors to help you processing XML data. Before that, you had few options requiring a bit of additional work to get things working (see here).

I won’t go into the details because the reader/writer are really well documented (have a look at the additional details for examples):

Also, with NIFI-4227, a ForkRecord processor is now available to “explode” your data when you have one or multiple embedded arrays in your data set and you need to normalize your data. Have a look at the ForkRecord documentation (additional details) for examples.

Example – Introduction

Let’s go through an example using the XML Reader and the ForkRecord processor. I assume I’m receiving XML data with the following schema:

Screen Shot 2018-06-27 at 10.39.20 AM.png

And here is a dummy file I’m receiving that I’ll use for this example:

Screen Shot 2018-06-27 at 2.15.28 PM.png

The corresponding Avro schema can be found here. Defining the Avro schema corresponding to your data is the most “difficult” part but once it’s done, everything else is really easy with the Record processors. You can get some help using the InferAvroSchema processor but this should only be used to get an initial version of your Avro schema when you start developing your workflows (it should not be used as a processor used in your production workflows).

I’m going to quickly explain the following workflow (template available here):

Screen Shot 2018-06-27 at 2.24.43 PM.png

Example – XML to JSON conversion

The GenerateFlowFile is used to generate my XML data and to send the content to the Record processors I’m using. The upper part of the workflow is just a ConvertRecord processor to perform the XML to JSON conversion thanks to the schema. The other parts of the workflows are used to extract each “object” of my schema (customer, address, account and transaction).

Here are the configuration details for the XML to JSON conversion. The GenerateFlowFile is very simple:

Screen Shot 2018-06-28 at 2.25.23 PM

Note the attribute ‘schema.name’ set with the value of the schema I added in my Avro schema registry controller service:

Screen Shot 2018-06-28 at 2.26.24 PM.png

The XML Reader is configured to get the schema name from the ‘schema.name’ attribute of the processed flow files:

Screen Shot 2018-06-28 at 2.27.21 PM.png

The JSON writer does not contain any specific configuration, it’ll write the data using the schema inherited at the reader level:

Screen Shot 2018-06-28 at 2.28.22 PM

The ConvertRecord processor can now be configured for the XML to JSON conversion:

Screen Shot 2018-06-28 at 2.29.20 PM.png

That’s it! You now have a very easy and powerful way to perform your XML to JSON conversion. Here the flow file content before the processor:

Screen Shot 2018-06-27 at 2.15.28 PM

Here is the JSON generated by the processor:

Screen Shot 2018-06-28 at 2.31.23 PM

If you need to manipulate your records to go from one schema to another, you can use the UpdateRecord processor.

Note – using the XML Reader/Writer and Record processors, you can expect performances as good as the best options presented in my previous post about XML data processing.

Example – ForkRecord processor

The goal here is to extract the data contained in the arrays into separated CSV files. From one XML file, I want to generate 4 CSV files that I could use, for instance, to send data into database tables.

Let’s start with the first one regarding customer data. I’m adding a ‘customer’ Avro schema in the Avro schema registry:

Screen Shot 2018-06-28 at 2.42.58 PM.png

I’m using an UpdateAttribute, to set the name of the ‘output.schema’ that I want to be used by the CSV Record Writer. Then, I’m just using a ConvertRecord processor to process from XML to JSON and I only keep the root level fields (customer data). Here is the configuration of my CSV writer:

Screen Shot 2018-06-28 at 2.45.23 PM.png

Here is the output when processing my XML data:

Screen Shot 2018-06-28 at 2.46.32 PM.png

Now, I want to extract data contained in arrays and that’s where the ForkRecord is useful. To use it, I need to specify a custom property containing the Record path to the array that the processor should process, then you have two modes for the processor:

  • Split – that will preserve the input schema but will create one flow file per element contained in the array.
  • Extract – that will generate flow files using the element of the arrays with the possibility to include the parent fields up to the root level in case you want to keep some fields as keys/identifiers of your arrays elements. Note that you can define which parent fields you want to preserve by setting the appropriate schema with only the fields you want.

In our case we’re using the Extract mode and we are including the parent fields: for the ‘address’ data for instance, we want to keep the customer_id field in the output (that field would be used as a key between the customer data and the address data in the database world). Here is the schema I define in my Avro schema registry for the ‘address’ object:

Screen Shot 2018-06-28 at 2.52.03 PM

I’m using an UpdateAttribute to set the attributes I’m using in my reader and the ForkRecord processor:

Screen Shot 2018-06-28 at 2.55.04 PM

And I’m configuring the ForkRecord processor to use the ‘fork.path’ attribute defining the array of elements to be processed:

Screen Shot 2018-06-28 at 2.55.59 PM.png

Note – if you have multiple Record paths pointing to multiple arrays with elements using the same output schema, you can define as many custom properties as you want.

And here is the output after the ForkRecord processor:

Screen Shot 2018-06-28 at 2.56.40 PM

Here are the Record paths I’m setting for the other type of objects I want to extract:

  • Account – fork.path = /accounts/account
  • Transaction – fork.path = /accounts/account[*]/transactions/transaction
    In this case, I need to specify account[*] because I want to access all the transactions of all the elements in the account array. But I could use the Record path features to only access some specific accounts based on some predicates.

And here the output I get (after defining the appropriate schema in my schema registry):

  • Account data (I only keep the customer_id from the parent fields):

Screen Shot 2018-06-28 at 3.02.14 PM.png

  • Transaction data (I only keep the customer_id and account_id from the parent fields):

Screen Shot 2018-06-28 at 3.03.55 PM

Conclusion

The XML Reader & Writer are a really nice addition to NiFi if you need to process XML data and it will make your existing workflows much simpler! You really have to look at the Record processors and concepts as soon as you’ve a use case manipulating data complying to a schema.

The ForkRecord processor is a new processor that you can use if you need to manipulate schema oriented data containing a lot of arrays. That can be useful in case you need to normalize data before ingestion into databases.

Thanks for reading this post and, as always, feel free to comment and/or ask questions!

NiFi workflow monitoring – Wait/Notify pattern with split and merge

Thanks to NIFI-4262 and NIFI-5293, NiFi 1.7.0 contains a small improvement allowing users to extend the Wait/Notify pattern to merging situations.

If you’re not familiar with the Wait/Notify concept in NiFi, I strongly recommend you to read this great post from Koji about the Wait/Notify pattern (it’ll be much easier to understand this post).

When using the Merge* processors, you have one relationship to route the flow file containing the merged data and one relationship to route the original flow files used in the merging operation. With NIFI-4262 for the MergeContent processor (and NIFI-5293 for the MergeRecord processor), the original flow files used in the merge operation will now contain an attribute with the UUID of the flow file generated by the merge operation. This way, you now have a way to link things together and that’s really useful to leverage the Wait/Notify pattern.

Use case presentation

Let’s demonstrate the feature with a use case: I’m receiving ZIP files containing multiple CSV files that I want to merge together while converting the data into Avro and send it into a file system (for simplicity here, I’ll send the data to my local file system, but it could be HDFS for instance).

The idea is the following:

ListFile -> FetchFile -> UnpackContent -> MergeRecord -> PutFile

That’s easy and it works great… BUT… I want to be able to monitor and track exactly how each ZIP file is handled and when the data contained in a ZIP file is stored in the destination. I have a requirement to maintain in real-time the following tables:

Table ‘zip_files_processing’

  • zip_file_name – unique name of the received ZIP files
  • ff_uuid – flow file UUID (useful to replay events in case of errors)
  • ingestion_date – date when the zip file starts being processing in NiFi
  • storage_date – date when all the data of the ZIP file has been processed
  • number_files – number of CSV files contained in the ZIP file
  • number_files_OK – number of files successfully stored in the destination
  • number_files_KO – number of files unsuccessfully processed
  • status – current status of the process for the ZIP file (IN_PROGRESS, OK, KO)

Table ‘files_ingestion’

  • zip_file_name – unique name of the zip file containing the CSV file
  • file_name – name of the CSV file
  • ff_uuid – flow file UUID (useful to replay events in case of errors)
  • storage_date – date when the CSV file has been processed
  • storage_name – name of the destination file containing the data
  • status – current status of the process for the CSV file (IN_PROGRESS, OK, KO)

That’s where the Wait/Notify is going to be extremely useful.

High level description of the workflow

Here is the main idea behind the workflow:

Screen Shot 2018-06-13 at 4.14.17 PM.png

Let’s describe the workflow at a high level:

  • List the data available and insert rows in my zip_files_processing table for each listed file so that I can track ingested data
  • Fetch the data (in case of error, I update the zip_files_processing table to set the status to KO)
  • I unpack my ZIP files to get a flow file per CSV file, and I route the original flow file to a Wait processor. The flow file representing the ZIP file will be released only once all the associated CSV files will be fully processed
  • For each CSV file, I insert a row in my files_ingestion table to track the status of each CSV file being processed
  • I merge the data together while converting the data from CSV to Avro, and I route the original flow files to a Wait processor. The flow files will be released only once the merged flow file will be sent to the final destination
  • I send the merged flow file to the destination (local file system in this case) and then use the Notify processor to release all the original flow files used to create the merged flow file
  • Then I use the released flow files to notify the first Wait processor and also release the flow file representing the corresponding ZIP file
  • With all the released flow files, I can update my monitoring tables with the appropriate information

Workflow details

Here is the full configuration of the workflow and some explanations around the parameters I used. Also, here is the template of the workflow I used – it’ll be easier to understand if you have a look, give it a try and play around with it (even without the SQL processors) – note that you will need to add a Distributed Map Cache server controller service to have the Distributed Map Cache clients working with Wait/Notify processors:

Screen Shot 2018-06-26 at 1.50.35 PM.png

Let’s start describing the ingestion part:

Screen Shot 2018-06-26 at 1.52.57 PM.png

Nothing very strange here. Just note the UpdateAttribute processor I’m using to store the original UUID to a dedicated attribute so that I don’t loose it with the UnpackContent processor (that creates new flow files with new UUIDs).

Screen Shot 2018-06-26 at 1.56.13 PM

The first PutSQL is used to insert new lines in my ZIP monitoring table for each ZIP file I’m listing. I’m executing the below query:

Screen Shot 2018-06-26 at 1.55.31 PM

The second PutSQL is used in case of error when fetching the file from the local file system or when an error occurs while unpacking the archive. In such a case, I’m executing the below query:

Screen Shot 2018-06-26 at 1.57.24 PM.png

Let’s now focus on the second part of the workflow:

Screen Shot 2018-06-26 at 1.58.28 PM

The UnpackContent processor is used to extract the CSV files from the ZIP file. For one flow file representing the ZIP file, it will generate one flow file per CSV files contained in the ZIP file. The original flow file will then be routed in the original relationship while flow files for CSV data will be routed in the success relationship. The flow files containing the CSV data will have a ‘fragment.identifier’ attribute and that’s the common attribute between the original flow file and the generated ones that I’m going to use for Wait/Notify (this attribute is automatically generated by the processor). Besides the original flow file also has a ‘fragment.count’ attribute containing the number of generated flow files for the CSV data.

The Wait processor is configured as below:

Screen Shot 2018-06-26 at 2.03.46 PM

Basically I’m setting the identifier of the release signal to the common attribute shared between the original and the generated flow files, and I’m saying that I have to wait for ‘fragment.count’ signal before releasing the original flow file to the success relationship. Until then, the flow file is transferred to the ‘wait’ relationship which is looping back on the Wait processor. Also, I configured a 10 minutes timeout in case I didn’t received the expected signals, and in this case the flow file would be routed to ‘expired’ relationship.

After my Wait processor, in case of error, I’m using an UpdateAttribute to retain this error:

Screen Shot 2018-06-26 at 2.08.50 PM.png

And after that, I’m using the PutSQL processor to update the ZIP monitoring table by executing the below query:

Screen Shot 2018-06-26 at 2.09.46 PM.png

The ‘wait.*’ attributes are generated by the Wait processor when releasing the flow file and contain the number of signals sent by the Notify processor (I’ll come back to that when describing the corresponding Notify processor). In this case, it allows me to know how many files contained in the ZIP file have been successfully processed and how many have not been successfully processed.

After the UnpackContent and the success relationship, I’m using a PutSQL to insert lines in my CSV monitoring table by executing the below query:

Screen Shot 2018-06-26 at 2.19.41 PM

The ‘segment.original.filename’ is the name of the ZIP file from which the CSV files have been extracted.

Let’s move to the final part of the workflow (it’s a bit loaded because I wanted to have everything in my screenshot):

Screen Shot 2018-06-26 at 2.27.15 PM.png

The MergeRecord is used to merge the data together while converting from CSV to Avro:

Screen Shot 2018-06-26 at 2.28.11 PM

In a real world use case, you would configure the processor to merge much more data together… but for the below demonstration I’m merging few flow files together so that I can quickly see the results.

Also I’m generating dummy data like:

Screen Shot 2018-06-26 at 2.29.41 PM.png

And the corresponding schema I’m using in the Avro Schema Registry controller service is:

Screen Shot 2018-06-26 at 2.30.55 PM.png

After the MergeRecord processor, there is the ‘original’ relationship where are routed the flow files used during the merge operation, and there is the ‘merged’ relationship where are routed the flow files containing the merged data. As explained above, the original flow files are containing a ‘merge.uuid’ attribute with the UUID of the merged flow file. That’s the attribute I’m using in the Wait processor configuration:

Screen Shot 2018-06-26 at 2.35.49 PM

Basically, I’m creating a signal identifier in the distributed cache with the ‘merge.uuid’ processor and as long as the signal counter is greater than 1, I’m releasing one flow file at a time. You’ll see in my Notify configuration (coming back to that in a bit) that, once my merged data is processed, I’m using the Notify processor to set the signal counter associated to the identifier with a value equal to the number of merged flow files. This way, once there is the notification, all the corresponding flow files will be released one by one.

I won’t describe the PutFile processor which is used to store my data (it could be a completely different processor), but just notice the UpdatAttribute I’m using in case of failure:

Screen Shot 2018-06-26 at 2.40.47 PM.png

And the UpdateAttribute I’m using in case of success:

Screen Shot 2018-06-26 at 2.41.24 PM

Note that I’m setting attributes with the same prefix ‘wn_ingestion.*’. Then, look at my Notify processor:

Screen Shot 2018-06-26 at 3.13.05 PM.png

The ‘uuid’ attribute is equal to the ‘merge.uuid’ I mentioned above for the original flow files used in the merge operation. Besides, I’m using the Attribute Cache Regew property to copy the corresponding attributes to all the flow files going to be released thanks to this signal. It means that when a merged flow file is sent to the Notify processor, all the original flow files with the corresponding in the ‘merge.uuid’ in the ‘wait’ relationship will be released and will get the ‘wn*’ attributes of the merged flow file. That’s how I’m “forwarding” the information about when the merged flow file has been sent to the destination, if the merged flow file has been successfully processed or not, and what is the filename used to save the file in the destination.

Once the original flow files are released from the Wait processor, they are sent to the Notify processor with the below configuration:

Screen Shot 2018-06-26 at 3.18.24 PM.png

In this case, I’m using the attribute inherited from the merged flow file sent to the destination to determine if the processing has been successful or not. If yes, then I’m setting the counter name to ‘OK’, otherwise it’s set to ‘KO’. That’s how I’m getting the information about how many CSV files from a given ZIP have been successfully processed or not. (remember the ‘wait.counter.OK’ and ‘wait.counter.KO’ attributes created when the flow file corresponding to the ZIP file is released?).

And now… I can execute my SQL query to update my CSV monitoring table with the latest information:

Screen Shot 2018-06-26 at 3.22.09 PM

Demonstration

Let’s see how the workflow is working and what’s the result in all kind of situations. I’m running a Postgres instance in a Docker container to store my monitoring data. Here are the statements I used to create the two tables.

CREATE TABLE IF NOT EXISTS zip_files_processing (
 zip_file_name varchar(255),
 ff_uuid varchar(255),
 ingestion_date timestamp DEFAULT current_timestamp,
 storage_date timestamp,
 number_files integer,
 number_files_OK integer,
 number_files_KO integer,
 status varchar(15)
);
CREATE TABLE IF NOT EXISTS files_ingestion (
 zip_file_name varchar(255),
 file_name varchar(255),
 ff_uuid varchar(255),
 storage_date timestamp,
 storage_name varchar(255),
 status varchar(15)
);

Case 1 – nominal situation

In this case, all the received data is OK and all is working flawlessly… Let’s say I’m receiving two ZIP files. Once the data is received, I can see something like:

Screen Shot 2018-06-15 at 11.53.58 PM

 

Then the ZIP files are unpacked, and I can see my CSV files in the other table:

Screen Shot 2018-06-15 at 11.54.42 PM

Then the data is merged and sent to the destination. At the end, my tables looks like:

Screen Shot 2018-06-15 at 11.56.28 PM.png

I can see that the data coming from the two ZIP files has been merged into the same final file and everything is OK. In the ZIP monitoring table, I have:

Screen Shot 2018-06-15 at 11.57.55 PM

Case 2 – cannot fetch ZIP file

In this case, the FetchFile processor is failing due to some permission issues. Here is what I get in my ZIP monitoring table:

Screen Shot 2018-06-16 at 12.04.35 AM

Note that I could a ‘comment’ column with the error cause.

Case 3 – cannot unpack ZIP file

Result will be identical to case 2 as I’m handling the situation in the same way.

Case 4 – cannot merge records because a CSV file is not schema valid

I’m processing 2 ZIP files but a CSV file contained in one of the ZIP file cannot be merged with the others because it does not respect the schema. Note that all the flow files used during a merge operation will be sent to the failure relationship if one of the flow file is not valid. That’s why multiple flow files will be routed to the failure relationship even though there is only one bad file. To avoid this situation, it would be possible to add an additional step by using the ValidateRecord processor.

For my ZIP files, I have:

Screen Shot 2018-06-22 at 8.42.00 PM

At the end, for this run, I have:

 

Screen Shot 2018-06-22 at 8.44.01 PM.png

Case 5 – cannot send a merged file to destination

In this case I’m simulating an error when sending a merged flow file to the destination. Here is the result at the end for the ZIP table:

Screen Shot 2018-06-22 at 9.57.09 PM

And the CSV files table:

Screen Shot 2018-06-22 at 9.57.44 PM

Conclusion

I’ve covered a non exhaustive list of situations and the workflow could be improved but it gives you a good idea on how the Wait/Notify pattern can be used in a NiFi workflow to help monitoring what’s going on when dealing with split / merge situations. In addition to that, it’d easy to add an automatic mechanism to try replaying failed flow files using the UUID and the provenance repository replay feature of NiFi.

Thanks for reading this post and, as always, feel free to comment and/or ask questions!