Secured NiFi cluster with Terraform on the Google Cloud Platform

This story is a follow up of this previous story about deploying a single secured NiFi instance, configured with OIDC, using Terraform on the Google Cloud Platform. This time it’s about deploying a secured NiFi cluster.

In this story, we’ll use Terraform to quickly:

  • deploy a NiFi CA server as a convenient way to generate TLS certificates
  • deploy an external ZooKeeper instance to manage cluster coordination and state across the nodes
  • deploy X secured NiFi instances clustered together
  • configure NiFi to use OpenID connect for authentication
  • configure an HTTPS load balancer with Client IP affinity in front of the NiFi cluster

Note — I assume you have a domain that you own (you can get one with Google). It will be used to map a domain to the web interface exposed by the NiFi cluster. In this post, I use my own domain: pierrevillard.com and will map nifi.pierrevillard.com to my NiFi cluster.

Disclaimer — the below steps should not be used for a production deployment, it can definitely get you started but I’m just using the below to start a secured cluster (there is no configuration that one would expect for a production setup such as a clustered Zookeeper, disks for repositories, etc).

If you don’t want to read the story and want to get straight into the code, it’s right here!


What is Terraform?

Terraform is a tool for building, changing, and versioning infrastructure safely and efficiently. Terraform can manage existing and popular service providers as well as custom in-house solutions.

Configuration files describe to Terraform the components needed to run a single application or your entire datacenter. Terraform generates an execution plan describing what it will do to reach the desired state, and then executes it to build the described infrastructure. As the configuration changes, Terraform is able to determine what changed and create incremental execution plans which can be applied.

The infrastructure Terraform can manage includes low-level components such as compute instances, storage, and networking, as well as high-level components such as DNS entries, SaaS features, etc.


What is NiFi?

Apache NiFi is an easy to use, powerful, and reliable system to process and distribute data. Apache NiFi supports powerful and scalable directed graphs of data routing, transformation, and system mediation logic. In simpler words, Apache NiFi is a great tool to collect and move data around, process it, clean it and integrate it with other systems. As soon as you need to bring data in, you want to use Apache NiFi.


Why ZooKeeper?

Apache NiFi clustering

Best is to refer to the documentation, but, in short… NiFi employs a Zero-Master Clustering paradigm. Each node in the cluster performs the same tasks on the data, but each operates on a different set of data. One of the nodes is automatically elected (via Apache ZooKeeper) as the Cluster Coordinator. All nodes in the cluster will then send heartbeat/status information to this node, and this node is responsible for disconnecting nodes that do not report any heartbeat status for some amount of time. Additionally, when a new node elects to join the cluster, the new node must first connect to the currently-elected Cluster Coordinator in order to obtain the most up-to-date flow.


OAuth Credentials

First step is to create the OAuth Credentials (at this moment, this cannot be done using Terraform).

  • Go in your GCP project, APIs & Services, Credentials.
  • Click on Create credentials, OAuth client ID. Select Web application.
  • Give a name like “NiFi”. For Authorized JavaScript origins, use your own domain. I’m using: https://nifi.pierrevillard.com. For Authorized redirect URIs, I’m using: https://nifi.pierrevillard.com/nifi-api/access/oidc/callback. Please adapt with your own domain (note there is no port as we’ll use the load balancer to access the cluster)
  • Click Create
Create the OAuth credentials

Once the credentials are created, you will get a client ID and a client secret that you will need in the Terraform variables.

By creating the credentials, your domain will be automatically added to the list of the “Authorized domains” in the OAuth consent screen configuration. It protects you and your users by ensuring that OAuth authentication is only coming from authorized domains.

Download the NiFi binaries in Google Cloud Storage

In your GCP project, create a bucket in Google Cloud Storage. We are going to use the bucket to store the Apache NiFi & ZooKeeper binaries (instead of downloading directly from the Apache repositories at each deployment), and also as a way to retrieve the certificates that we’ll use for the HTTPS load balancer.

Note — you’ll need Apache ZooKeeper 3.5.5+.

You can download the binaries using the below links:

Here is what it looks like:

Content of the bucket in Google Cloud Storage

Note — you’ll need to use the NiFi Toolkit version 1.9.2

Deploy NiFi with Terraform

Once you have completed the above prerequisites, installing your NiFi cluster will only take few minutes. Open your Google Cloud Console in your GCP project and run:

Deploy script

If you execute the above commands, you’ll be prompted for the below informations. However, if you don’t want to be prompted, you can directly update the variables.tf file with your values to deploy everything.

Variables to update:

  • project // GCP Project ID
  • nifi-admin // Google mail address for the user that will be the initial admin in NiFi
  • san // FQDN of the DNS mapping for that will be used to access NiFi. Example: nifi.example.com
  • proxyhost // FQDN:port that will be used to access NiFi. Example: nifi.example.com:8443
  • ca_token // The token to use to prevent MITM between the NiFi CA client and the NiFi CA server (must be at least 16 bytes long)
  • oauth_clientid // OAuth Client ID
  • oauth_secret // OAuth Client secret
  • instance_count // Number of NiFi instances to create
  • nifi_bucket // Google Cloud Storage bucket containing the binaries

Here is what it looks like on my side (after updating the variables.tf file):

Execution of the deploy script

Explanations

The first step is to deploy the NiFi Toolkit on a single VM to run the CA server that is used to generate certificates for the nodes and the load balancer. Once the CA server is deployed, a certificate is generated for the load balancer and pushed to the Google Cloud Storage bucket.

The script you started is waiting until the load balancer certificate files are available on GCS. Once the files are available, files are retrieved locally to execute the remaining parts of the Terraform template. It will deploy the ZooKeeper instance as well as the NiFi instances and the load balancer in front of the cluster. All the configuration on the NiFi instances is done for you. Once the script execution is completed, certificates files are removed (locally and on GCS).

After 5 minutes or so…

The load balancer has been created and you can retrieve the public IP of the load balancer:

Retrieve the external public IP of the HTTPS load balancer

You can now update the DNS records of your domain to add a DNS record of type A redirecting nifi.pierrevillard.com to the load balancer IP.

I can now access the NiFi cluster using https://nifi.pierrevillard.com and authenticate on the cluster using the admin account email address I configured during the deployment.

Here is my 6-nodes secured NiFi cluster up and running:

6-nodes secured NiFi cluster
6 nodes with the elected primary and coordinator nodes

I can now update the authorizations and add additional users/groups.

Note — you could use Google certificates instead of the ones generated with the CA server to remove the warnings about untrusted certificate authority.

Cleaning

To destroy all the resources you created, you just need to run:

terraform destroy -auto-approve

As usual, thanks for reading, feel free to ask questions or comment this post.

Running visual quality inspection at the edge with Google Cloud and Apache NiFi & MiNiFi

On the 23rd of October 2019, I gave a talk at the Apache Con in Berlin about running visual quality inspection at the edge. This is the story describing the talk and helping you use both Google Cloud and Apache NiFi & MiNiFi to continuously run updated TensorFlow models at the edge.

Context

Picture this: you are in a factory making cookies… you have thousands of cookies going through your production lines every day and you want to make your customers happy. Problem is… a broken cookie makes an unhappy customer! So you want a way to detect the broken cookies before they get into the final packaging.

This is the example I am choosing to support this story but it could really be anything related to visual quality inspection which spans across multiple industries such as retail, manufacturing, energy, etc. Anything that you can think of that would need a ML model trained over a dataset of images.

What is Apache NiFi & MiNiFi?

Apache NiFi is an easy to use, powerful, and reliable system to process and distribute data. Apache NiFi supports powerful and scalable directed graphs of data routing, transformation, and system mediation logic. MiNiFi — a subproject of Apache NiFi — is a complementary data collection approach that supplements the core tenets of NiFi in data flow management, focusing on the collection of data at the source of its creation.

In simpler words, Apache NiFi and MiNiFi provide a great combination of tools and features to collect and move data around, process it, clean it and integrate it with other systems. As soon as you need to bring data in, you want to use Apache NiFi.

What is Google Cloud Vision?

Google Cloud Vision allows you to derive insights from your images in the cloud or at the edge with AutoML Vision or use pre-trained Vision API models to detect emotion, understand text, and more.

With Google Cloud Vision, you have two options:

  • AutoML Vision: automate the training of your own custom machine learning models. Simply upload images and train custom image models with AutoML Vision’s easy-to-use graphical interface; optimize your models for accuracy, latency, and size; and export them to your application in the cloud, or to an array of devices at the edge.
  • Vision API: Google Cloud’s Vision API offers powerful pre-trained machine learning models through REST and RPC APIs. The APIs cover: image classification and labelling into millions of predefined categories, detect objects and faces, read printed and handwritten text, build valuable metadata into your image catalog, moderate content, etc.

In this story we are going to build our own custom machine learning models with our own labels and images using AutoML Vision.

Objective: continuous model retrain

Automatic retrain of the custom model
Automatic retrain of the custom model

The objective is to have an end-to-end system to detect the defects while having a continuous model retrain with the newly collected images. Key benefits are:

  • Automatically train customized ML models in the cloud
  • Efficiently acquire images, label images, deploy model and run inference
  • Continuously refresh models using fresh data from the production lines
  • Minimal human interaction
  • ML model accuracy improving over time

Using a Raspberry Pi at the edge

To collect data from the production lines, I’m going to use a Raspberry Pi on which I’ll be running Apache MiNiFi. I added a camera to take pictures as well as LEDs to give visual indications of what is currently being processed.

Raspberry Pi 3 used to collect data
Raspberry Pi 3 used to collect data

In order to collect data from the edge with the Raspberry Pi, I’m going to use Google IoT Core which is a fully managed service to easily and securely connect, manage, and ingest data from globally dispersed devices.

First step is to create a Device Registry which is a container of devices with shared properties. Once the Registry is created, you can register your device with its device identity. To do that you need to generate a public/private key pair that will be used by the device to authenticate against Google IoT Core (you can also use Certificate Authorities). Here are the commands I used to generate my key pair for the Raspberry Pi.

My device is registered in my device registry
My device is registered in my device registry

Then I’ll be using the MQTT protocol to send data between my device and Google Cloud, it will automatically make the data available into a Pub/Sub topic:

Interface between the device and Google Cloud IoT Core
Interface between the device and Google Cloud IoT Core

To communicate with Google Cloud IoT Core through MQTT, I’ll be using a processor that is currently under code review in a Pull Request on Github. I’ll also make this processor available in a repo I created to share the code snippets of my talk.

One problem, two architectures

When we’ll have collected pictures from our devices, we’ll be able to train our custom model. Then we’ll have to choose between two options:

  • Deploy the model in Google Cloud and do the inference in the cloud. It means that for each picture, the Raspberry Pi with Apache MiNiFi will make an HTTPS call to the served model to get the label of the picture. While this is efficient and is easy to implement and scale, this means making a call over the Internet and a longer inference time. In this story that what I’ll call the mode “cloud” when configuring the device. In the cloud mode, here is what the architecture looks like:
Architecture in the “cloud” mode
Architecture in the “cloud” mode
  • Download the model from Google Cloud on the Raspberry Pi and do the inference at the edge using Apache MiNiFi. Google Cloud provides multiple options to download your custom models with various accuracy versus latency parameters. Running the model on the edge will lead to much faster inference time. In this story that’s what I’ll call the mode “edge” when configuring the device. In the edge mode, here is what the architecture looks like:
Architecture in the “edge” mode
Architecture in the “edge” mode

We’ll also see that, in the edge mode, it’s possible to get an even better inference time using Google’s hardware with the Coral Edge TPU.

Collecting pictures

Here is what the workflow running in MiNiFi (on the Raspberry Pi) looks like:

Workflow running in MiNiFi on the Raspberry Pi to collect images
Workflow running in MiNiFi on the Raspberry Pi to collect images

The workflow is really simple: a “standalone” processor executes a Python script to take pictures at a given frequency using the Pi camera. Then the pictures are fetched into MiNiFi to be sent to Google Cloud IoT Core through the MQTT processor. I’m also switching on/off the amber LED when this process occurs to give a visual indication on the Pi about what’s going on.

As we said, the data will be automatically sent to a Pub/Sub topic and made available for consumption. In the Google Cloud Platform, I’m running a standalone secured Apache NiFi instance (see my previous post for a very quick deployment using Terraform) on which the below workflow is running:

Workflow running in NiFi in GCP to ingest images
Workflow running in NiFi in GCP to ingest images

The pictures are pulled from my topic’s subscription, then the images are stored in Google Cloud Storage and I’m using the Google Cloud Vision API to add the pictures into my dataset (which is the collection of images I’m using to train my custom models).

I won’t go too much into the details as the documentation is self-explanatory but here is the call I’m making — it gives the GCS path of the CSV file I created listing the GCS paths of all the images I ingested during the last X minutes:

curl \
  -X POST \
  -H "Authorization: Bearer $(gcloud auth application-default print-access-token)" \
  -H "Content-Type: application/json" \
https://automl.googleapis.com/v1beta1/projects/${projectID}/locations/us-central1/datasets/${datasetID}:importData \
  -d '{
        "inputConfig": {
          "gcsSource": {
            "inputUris": "gs://${gcs.bucket}/dataset.csv"
          }
        }
      }'

Note 1: the dataset ID is something looking like: ICN4695798657952251904.
Note 2: in NiFi, I externalized a lot of variables (project ID, GCS bucket, dataset ID, etc) that I reuse across the workflow to make it much more easier to use and configure.

Managing your device configuration

Once your device is registered in the Device Registry in Google Cloud IoT Core, you can use the UI to send configuration updates or commands to your devices. Here is what it looks like:

Updating the configuration of my device through MQTT
Updating the configuration of my device through MQTT

On the Raspberry Pi, in MiNiFi, the configuration updates and commands are received by the MQTT processor and I process the received data to update the configuration of my device:

Reception of configuration updates and commands
Reception of configuration updates and commands
Processing configuration updates
Processing configuration updates

When I receive a configuration update, I process the JSON payload to extract the mode and if I should use the Edge TPU or not, I then store this information into a local cache that is also persisted on disk in case of device restart.

Training and serving your model

We now have a device that we are able to configure from anywhere, that is automatically sending pictures from our production lines, and the pictures are automatically stored and added to our Google Cloud Vision dataset to train our custom models.

The next step, in Google Cloud Vision, is to create labels that will be used to classify our images. In this case, I only create two labels:

  • OK: the cookie looks good and can go into the packaging
  • NOK: the cookie is broken and should be removed

In the UI, I’m able to manually label the images and I need, at least, 10 images per label to start training my custom model (the more you have, the better the results are, and that’s why we want to continually ingest new images and retrain our model over time):

My dataset with the pictures of my cookies and my labels
My dataset with the pictures of my cookies and my labels

In my NiFi instance running in GCP, I created a workflow which is used to trigger a model training every day in order to take into account the newly captured images and improve the model’s accuracy over time:

Model training is triggered daily in NiFi
Model training is triggered daily in NiFi

To start the custom model training and wait until the training is completed I’m using the REST APIs described in the documentation. However, at this stage, you have to specify if you want to train a model that will run in the cloud (cloud mode) or at the edge (edge mode):

When training a model to be exported at the edge, you can specify a parameter allowing you to choose if you prefer latency over accuracy:

  • mobile-low-latency-1 for low latency,
  • mobile-versatile-1 for general purpose usage, or
  • mobile-high-accuracy-1 for higher prediction quality.

When a cloud-hosted model is being trained, the Google Cloud Vision UI would look like this:

Cloud-hosted model currently being trained
Cloud-hosted model currently being trained

Once the model training is completed, NiFi will receive a JSON payload such as:

JSON payload when model training is completed
JSON payload when model training is completed

Once the model is trained, we can access a lot of information regarding the model training and its accuracy compared to the provided dataset:

Custom model information about precision, recall, etc
Custom model information about precision, recall, etc

Cloud-hosted model

Once the model is trained, we can call the REST APIs to deploy the model. Here is the part of the NiFi workflow in charge of this:

NiFi deploying the cloud-hostel custom model
NiFi deploying the cloud-hostel custom model

Once the deployment is completed, the model will be automatically exposed through a REST API in Google Cloud (you don’t have to worry about the how and the where, Google takes care of it). The only information you need is the model ID and this is the information we are sending to the Raspberry Pi using the Google IoT Core commands with the ExecuteStreamCommand processor:

gcloud iot devices commands send \
    --command-data=ICN147321363982450688
    --region=REGION  \
    --registry=REGISTRY_ID \
    --device=DEVICE_ID

In MiNiFi, we are receiving the command and storing the model ID in the cache:

Receiving the model ID through the Google IoT Core MQTT command
Receiving the model ID through the Google IoT Core MQTT command

We can now perform the inference for each picture we capture by making an HTTPS call against the exposed API:

Inference in the cloud using the exposed API
Inference in the cloud using the exposed API

Note the RouteOnAttribute processor that will check in which mode the device is configured. In this case, the device is configured in cloud mode.

The picture payload needs to be base64 encoded and the API to use is:

curl -X POST \
  -H "Authorization: Bearer $(gcloud auth application-default print-access-token)" \
  -H "Content-Type: application/json" \
https://automl.googleapis.com/v1beta1/projects/${PROJECT_ID}/locations/us-central1/models/${MODEL_ID}:predict \
  -d '{
        "payload" : {
          "image": {
            "imageBytes" : "/9j/4AAQSkZJRgABAQAAAQ … "
          }
        }
      }'

Edge exportable model

In this mode, once the TensorFlow model is trained, we just need to make an API call to export the trained model into Google Cloud Storage:

API calls to export the custom edge model into Google Cloud Storage
API calls to export the custom edge model into Google Cloud Storage

The API to use is:

curl \
  -H "Authorization: Bearer $(gcloud auth application-default print-access-token)" \
  -H "Content-Type: application/json" \
  https://${ENDPOINT}/v1beta1/projects/${PROJECT_ID}/locations/us-central1/models/${MODEL_ID}:export \
  -d '{
        "output_config": {
          "model_format": "tflite",
          "gcs_destination": {
              "output_uri_prefix": "${USER_GCS_PATH}"
          }
        }
      }'

In case you want to export a model that is optimised for the Edge TPU (we’ll talk about this in a bit), then you have to use:

curl \
  -H "Authorization: Bearer $(gcloud auth application-default print-access-token)" \
  -H "Content-Type: application/json" \
  https://${ENDPOINT}/v1beta1/projects/${PROJECT_ID}/locations/us-central1/models/${MODEL_ID}:export \
  -d '{
        "output_config": {
          "model_format": "edgetpu_tflite",
          "gcs_destination": {
              "output_uri_prefix": "${USER_GCS_PATH}"
          }
        }
      }'

With this API call, NiFi will receive a JSON payload with the GCS path to the tflite file representing your custom model. Here again we are using a Google Cloud IoT Core command to send this information to our device through MQTT:

gcloud iot devices commands send \
    --command-data=gs://.../model.tflite
    --region=REGION  \
    --registry=REGISTRY_ID \
    --device=DEVICE_ID

On MiNiFi, we are receiving this information and downloading the model directly on the device so we can do local inference:

Receiving the GCS path of the model through MQTT and downloading the model locally
Receiving the GCS path of the model through MQTT and downloading the model locally

Now we have the TensorFlow lite model downloaded on the Raspberry Pi, we can perform the inference on every captured image:

Inference on the edge using the downloaded TensorFlow lite model
Inference on the edge using the downloaded TensorFlow lite model

To execute the model locally, I’m executing a Python script that you can find here.

Boosting your device with the Coral Edge TPU

Since the 22nd of October 2019, Coral is now GA! Coral is a hardware and software platform for building intelligent devices with fast neural network inferencing.

At the heart of the devices is the Edge TPU coprocessor. This is a small ASIC built by Google that’s specially-designed to execute state-of-the-art neural networks at high speed, with a low power cost. The Edge TPU is capable of performing 4 trillion operations (tera-operations) per second (TOPS), using 0.5 watts for each TOPS (2 TOPS per watt).

Edge TPU benchmark by Google
Edge TPU benchmark by Google

In my use case, I’m using the USB accelerator that I’m directly plugging to the Raspberry Pi.

Coral USB accelerator
Coral USB accelerator

Using the custom ML models I trained on Google Cloud Vision, I can easily compare the TensorFlow “invoke” time when using or not the Edge TPU:

  • When not using the Edge TPU optimised model:
TF Lite invoke time when not using the Edge TPU — about 127ms/image
TF Lite invoke time when not using the Edge TPU — about 127ms/image
  • When using the Edge TPU and the optimised model:
TF Lite invoke time when using the Edge TPU — about 9ms/image
TF Lite invoke time when using the Edge TPU — about 9ms/image

We get about 127ms without using the Edge TPU while we only need 9ms when using the Edge TPU on a low latency optimised model generated for my cookies use case with Google Cloud Vision AutoML.

Preliminary results

So far we have sent the pictures through MQTT and performed the classification inference into two distinct branches of the MiNiFi workflow (in other words: pictures ingestion and pictures inference are done in parallel). Here are the results we get (processing time is the duration between the moment we take the picture and the moment we get the label and the confidence score of the classification ; inference time is the duration of the “invoke” call to get the classification using the TensorFlow Lite model):

  • cloud mode (inference in the cloud through HTTPS):
    – Processing time: about 6 seconds per image
    – Inference time: about 2.5 seconds per image
  • edge mode (without the Edge TPU):
    – Processing time: about 750 milliseconds per image
    – Inference time: about 127 milliseconds per image
  • edge mode + TPU(with the Edge TPU):
    – Processing time: about 500 milliseconds per image
    – Inference time: about 9 milliseconds per image

Monitoring and auto-labelling

What we have done so far is great but, once we made the inference on the Raspberry Pi, we take the action to keep or not the cookie on the production line (in my case, I switch on and off the red or green lights) and we forget about the inference results. The next step is to perform the inference before sending the pictures through MQTT so that we can send the inference results along with the pictures. Pictures inference and pictures ingestion are done sequentially. This has two main key benefits:

  • We can collect additional data to deploy monitoring dashboards in order to have accurate information about how our custom models are performing on our devices
  • We can introduce auto-labelling of the images: if the confidence score of the inferred label is over, for example, 90%, we could auto-label the image when ingesting it into the Google Cloud Vision dataset so that minimal human interaction is required when it comes to labelling: only the images representing new types of defects might need human labelling.

I won’t go into the details of the required changes in the workflows but on MiNiFi it looks like this:

MiNiFi workflow when sending inference results along with pictures through MQTT
MiNiFi workflow when sending inference results along with pictures through MQTT

Once we receive all the information in NiFi, we can add this information into Stackdriver monitoring dashboards and see in real-time how our custom models are performing:

Processing time and score for cloud mode versus edge mode
Processing time and score for cloud mode versus edge mode

In the above picture we can see how the processing time and the confidence score is changing while going from the cloud mode to the edge mode after updating the device configuration. We clearly see that the processing is much faster but we are losing a little bit in accuracy (still above 90% though!).

We can also compare the results when using the Edge TPU or not while the device is configured in edge mode:

Processing time and score for edge mode with and without Edge TPU
Processing time and score for edge mode with and without Edge TPU

In the above picture,we can see an interesting result: when using the Edge TPU with the Edge TPU optimised model, we get a lower processing time while also getting a higher confidence score for the inference. No reason not to use it!

Conclusion

First, thank you for reading this very long story… Then, here is the conclusion of it: thanks to Apache NiFi, MiNiFi, the products of the Google Cloud Platform and the Coral products, we implemented an efficient end-to-end solution to run custom ML models on the edge while continuously refresh our models with new data to improve the models accuracy over time with no code and minimal human interaction. I’ll leave it to your imagination to transpose all of this to your very own use case!

The slides of the talk, the recording as well as code snippets, custom processors, workflows, scripts, etc, will be added in the coming days on this Github repository. As usual, feel free to comment/ask questions.

NiFi with OIDC using Terraform on the Google Cloud Platform

When I present Apache NiFi during talks or meetings, I have to quickly start and stop instances. It’s very easy to do it on your own laptop with Docker, but it’s even better to have it running in the cloud and use IAC (Infrastructure As Code).

It’s very easy to start Apache NiFi on the Google Cloud Platform in a Compute instance, expose it on the Internet and have everything running. It just takes two commands and few seconds… Go in your GCP project, start the Cloud Shell console and run the two below commands:

gcloud beta compute instances create-with-container my-nifi-instance --tags=nifi --container-image=apache/nifi
gcloud compute firewall-rules create allow-nifi-unsecured --action=ALLOW --rules=tcp:8080 --target-tags=nifi

You just started a Compute instance with the latest version of Apache NiFi and exposed it to anyone on the internet. You just need to get the external IP of your instance and you can access the UI on http://external_ip:8080/nifi.

It’s great but you need to understand that your instance is not secured and exposed to anyone. In short… you should never do that. At the very least, get your IP and restrict the access to the instance to your own IP.

But please… Security must be a first class citizen and the Apache NiFi community is really doing an amazing job to give you the best options to secure your instances.


In this post I show you how to use Terraform to start a secured NiFi instance configured to use OpenID Connect (OIDC) for authentication.

Note — I assume you have a domain that you own (you can get one with Google). It will be used to map a domain to the web interface exposed by NiFi. In this post, I use my own domain: pierrevillard.com and will map nifi.pierrevillard.com to my NiFi instance.

Disclaimer — the below steps should not be used for a production instance, I’m just using the below to start a secured instance with a single user access for short demos (there is no configuration that one would expect for a production or long-lived instance).


OAuth Credentials

First step is to create the OAuth Credentials (at this moment, this cannot be done using Terraform). 

Once the credentials are created, you will get a client ID and a client secret that you will need in the Terraform variables.

By creating the credentials, your domain will be automatically added to the list of the “Authorized domains” in the OAuth consent screen configuration. It protects you and your users by ensuring that OAuth authentication is only coming from authorized domains.


Deploy NiFi with Terraform

I’ll go a bit deeper of what I’m doing in the next parts, below are just commands to deploy everything. Go in your GCP Project, and start the Cloud Shell console.

git clone https://github.com/pvillard31/nifi-gcp-terraform.git
cd nifi-gcp-terraform/gcp-single-secured-nifi-oidc/
terraform init
terraform apply

When applying the Terraform configuration, it’ll ask for some information:

  • The token to be used between the NiFi CA and the NiFi instance to generate certificates. You can use a random string which is at least 16 bytes long.
  • The Google email address of the user that will be the initial admin for the NiFi instance.
  • The OAuth Client ID and Secret you got before.
  • And the sub-domain that you will configure and use to access your NiFi instance. In my case: nifi.pierrevillard.com.

Access NiFi

Once the Terraform configuration is applied. You need to map your subdomain to the static IP of the NiFi instance. Go on your GCP Project, on the Compute Engine page and get the external IP of the NiFi instance:

Once you have the external IP, go to your DNS provider page and add a ‘A’ record to your DNS records with the subdomain pointing to the external IP. The exact steps depend on your DNS provider.

Once done, you should be able to access the NiFi UI using your subdomain on the port 8443: https://nifi.pierrevillard.com:8443/nifi.

You will most probably get a warning from your browser because of the untrusted certificate authority. That’s because we generated a CA certificate to sign the NiFi certificate. You can ignore the warning for a demo but otherwise you should use a trusted CA certificate:

Once you proceed to the website, you’ll be redirected to the Google authentication page asking for your credentials. That’s because we configured NiFi to use OpenID Connect to delegate the authentication to Google. At this point, you can only authenticate using the Google address you provided as initial admin for NiFi:

Then you are connected as the user and can access the canvas:

Before being able to design you first workflow, you’ll need to go to the “Policies” menu to grant you the required permissions. You can also go to the “Users” menu to add additional users that will be able to authenticate on the UI and give them the appropriate permissions. 


Details

The Terraform configuration files are on Github:

  • provider.tf to define the Google Cloud provider with the basic GCP project information
  • network.tf to create a network and subnetwork dedicated to the NiFi CA instance and the NiFi instance
  • firewall.tf to create the rules to allow internal communications, SSH access to the instances and access to the NiFi instance on the port 8443
  • nifi-ca.tf to install the NiFi Certificate Authority (provided with the TLS toolkit) in server mode in order to create a certificate authority and sign the certificates for the NiFi instance
  • nifi.tf to install the NiFi instance, get the certificate from the NiFi CA, generate the keystore and truststore and configure the NiFi instance to be secured and use OpenID Connect for authentication
  • variables.tf to define some variables to be used to customize the deployment (more variables could be added)

Remember, this is a basic deployment of NiFi but you have a secured instance with Google delegated authentication.

Note — To delete all the created resources, you can use ‘terraform destroy’.

There is much more to do to get closer to a production ready deployment but it gets you started to play with NiFi on the Google Cloud Platform. I’ll add more features in my next posts (the immediate next step will be to add a secured NiFi Registry instance that is connected to the Google Cloud Source Repositories).

Thanks for reading, feel free to ask questions or comment this post!

NiFi & NiFi Registry on the Google Cloud Platform with Cloud Source Repositories


This post is about quickly and easily deploying an unsecured instance of NiFi and an unsecured instance of the NiFi Registry which uses the Cloud Source Repositories service as backend for the flow persistence provider.

The objective is to quickly deploy NiFi and the NiFi Registry, connect the two together, version the workflows in the Source Repositories, and be up and running quickly to start building workflows. This is not suitable for production deployment as we are not securing the instances (I’ll talk about that in another post).

Also this story is about a new feature in NiFi Registry 0.4.0 (NIFIREG-209) which allows the NiFi Registry to rebuild all the metadata from an existing Git repository of flows. It’s a very nice feature when you start and stop NiFi instances on the fly while also having access to your versioned flows very easily. Actually, using this feature, we could run the NiFi Registry in Google Cloud Run and have the production instances of NiFi just pulling the versions of the flows from the NiFi Registry exposed by Google Cloud Run. By doing that you would leverage the advantages of serverless. If you are interested by Google Cloud Run, you might be interested about this post for running NiFi workflows in Cloud Run.


Setup Source Repository

I start creating a fresh new project in my Google Cloud Platform console. I call this new project ‘nifi-registry’. Once the project is created, I go into Source Repositories. If it’s your first time, click on ‘Get started’ and ‘Create repository’.

Source Repositories is the Google Cloud offer to get free unlimited private Git repositories to organize your code in a way that works best for you (you can also mirror code from GitHub or Bitbucket repositories to get powerful code search, code browsing, and diagnostics capabilities). It also nicely integrates with CI/CD tools.

In my case, I create a new repository that I call ‘nifi-flow-repository’.

Let’s now setup the SSH key to allow access to the repository.

$ ssh-keygen -t rsa -b 4096 -m PEM -C "NiFi Registry"

Generating public/private rsa key pair.
Enter file in which to save the key (~/.ssh/id_rsa): ~/temp/id_rsa
Enter passphrase (empty for no passphrase): 
Enter same passphrase again: 
Your identification has been saved in ~/temp/id_rsa.
Your public key has been saved in ~/temp/id_rsa.pub.
The key fingerprint is:
SHA256:/hu6FZLvcvDigP4ixPFPEGzfpY1RHOoTRytjqYOoYHo NiFi Registry
The key's randomart image is:
+---[RSA 4096]----+
|   .     o+.     |
|    +   .+o.     |
|   . o .B*o      |
|  .....++*.      |
|.o.o..o S .      |
|+.o ...o.+ .     |
|o.E .o. .o+      |
| .... ..o=o.     |
|   ..o..+=+.     |
+----[SHA256]-----+


$ ls
id_rsa     id_rsa.pub

For this demo to work, we generate a PEM encoded key and we use an empty passphrase (again, this is not ideal for production). Once done, you can register the SSH key with Google Cloud (there is a link available after you created the repository). You just have to give a name to the key and copy the content of the generated id_rsa.pub file.

Start the NiFi Registry

We can now focus on starting the NiFi Registry. To be up and running very quickly, I’m going to rely on the Docker image provided by Apache NiFi and use it in a simple Compute Engine instance with Docker enabled.

In Compute Engine / Instance templates, you can create a new template. Here is my setup with the parameters I changed (adapt it to your needs):

  • Name : nifi-registry
  • Check “Deploy a container image to this VM instance”
  • Container image : apache/nifi-registry
  • Go into “Advanced container options”
  • In the volume mounts (use Directory as volume type) :
  • Go into “Management, security, disks, networking, sole tenancy”
  • Add the below startup script :
#! /bin/bash

# This script is used when starting a docker image based GCE instance
# of the NiFi Registry. It is intended to configure the NiFi Registry
# so that the persistence provider is the Google Cloud Source Repo.

# Docker volumes (directory type)
# /tmp/config => /home/nifi/.ssh - Read only
# /tmp/ssh/id_rsa => /id_rsa - Read only
# /tmp/nifi-flow-repository => /nifi-flow-repository - Read/Write
# /tmp/providers.xml => /opt/nifi-registry/nifi-registry-0.4.0/conf/providers.xml - Read only

# Note that 1000 is uid/gid for NiFi user/group in the Docker container

# create directory for SSH keys
mkdir /tmp/ssh

# private SSH key to authenticate againt the Google Cloud Source Repo
# its associated public key needs to be registered on Source Repo
# this is the content of the id_rsa file we generated, change it with yours!
touch /tmp/ssh/id_rsa && chmod 600 /tmp/ssh/id_rsa
cat <<EOF >> /tmp/ssh/id_rsa
-----BEGIN RSA PRIVATE KEY-----
MIIJJwIBAAKCAgEAxRVsCHEUnKJXZnJ8GZVb/D0bvCrzK/p8jYiuleUqCgrqJ+2D
hocKkLqPKU5yiiA11vDCgrn7GBkWUH+Naj6rc9qZviwJtuZFjfKt+gg9++JLsnQn
8TdfbZ7nKZ2fh397Yr50mLP9wJw6A0nd68Y9YowRID64ZJ+kfqtnr5gaU5wI6/j+
UEV8QO2hpQfJ+hI0TUe82kY3l21J4FcLGbKsZSwMjRDRIG9fly4adEVsPq+WZFJ1
6q8ZsGfLpArCORQiM81mBFTeeLn65FZwDEsAIko3Fv+OVBjv+YtpaUOL4QwM9E3X
9v+0WqkKG0MYm12FjHkulIdHbO9gVIXKM8sIyI+J49fxWiLnb+HiYExCvdgaKCAf
eR12zIjTXHrX8YHF5uxfE0oCMgPcZIcsSVO0zjNNKniJy4AWU46hxB8USI7Kf9jD
i/YVRmDi67mXSi6z8FjamccIWg8CnWVkLTr771ERZATZAn7YDY7Zu8Z+AXE6U12o
7hkcB940B+hHnHrB32iKiOev+NLR+NCde1YUBHfaw6vmmsBwrmO9H/cSGz9L1aC/
seSOBrnWef/YZXAXtMb1maSGpVEW+74uR05UL5Bt5xlB+MAQGRcDWkMJ08teqYGZ
IOgqqeESihXq2Suk1+nMhO/kUHdUBHVHXBmQTqtRMKjMyyRaM/rU/BwF0PECAwEA
AQKCAgBEFI1YMS/sl8vXUO77q3O2I6nkC3YrGAFUpwWgNCSciX6vwkVwAFAvBLMV
ksrQWiYfFNYscHCDt47Uyesg63swry/y2KWWb99RFKbyu0wmKdr6T1PI6NbnOEAk
SRXlqa0GxEHkyjB7C7yijC7EFpv34ei8mc36vIcHVtCfgMx/W2Rdl4rKUeaFS1FO
f+1OnGFC3OgpAp6Lia4+d/MWsnkJDflb9ZY4PSDpSXzL83tcAC3UhAPFySz3mjNn
fGvxaboS6P43utWLILvBivZz2Ki4L6juIeOZu2+SZY1JVpMIb7A96HIVGenEc6ZR
GpFyghDzIJ1to3mR8PL236ykzZs/iRbRGOvHSHNFiAmKMIXqeYe+Q6u/9mGVo68J
Vbw0PeTWaaPnnBM3iVi2d3STGsI6x2bTtosDyRS5YLNztj0Y8mOSWQz8JvnTcikM
lKBcSbA3K/ophzSyjk8sN65HqM1JQ0v/bK6w8TtZBLThPwcqRtbvYQBCQC7g74Oe
5KAuJRDTKDSxkLlD9AX0hdMIMIVKNd5h8Sx7vlX6DZ/5PdLT87eIS39mQlXNjE6W
kvGPte1YlTqWFECmxHlxvSEMB6txEzUrfEVLpTdKkpnygQFJfZdKzTiIsFiqKn9s
s6IAuzyHBJ6MMUdstZ8DMMnE/uXWU3ouUemSLcRiPMXi5AlZkQKCAQEA+EuQ75un
9PS/D6pCcOrZaodK0J2Xz4KX2VkuEJeP/KmDAFwcB0HzpQO6zQglIf8Sv9r3/g10
qk6pt5Y+gqBnpQOV/uUgN7U0lTMAnxx4l3RxXKZW3cqEYL326ouzRKDjz7TxYiWK
T+RT4QlK0jm7NfqZ8trqwi4dB4Nz5JA4MItoSinyWYzRfnaNH8jBxYmRyDGWQLsq
y2Dep6AZ2GPez473xl6TdwTc47UnDYbyuXdhK4yV+Jr9quYUkda+gFVlh7ZEK/vi
WsBoh0IjG07C4m8rNdxHnfplH9MIFo+I2u+hk6Ot4hpTc9b+Vj93FidSnboOEjDh
YD6MM1A8gVJdtQKCAQEAyzMJVHOAWq0F14h6vUY9m8lkw3KBZorfTsDD39kG3P7C
lh/VCCqk7Uz6drkPc6Gb/xp1LgCDhdxxNISos/92IINFwTmg1bQpaCBsfgkM+r2g
S87hUdCL3NCLwsrS67oQjalEAVu8TZ2obCZ48EgasiY48tq7SrQ7+tW7PSjnO5pv
pM661m5Gmp0/y7+0CkPg2JyGp4mwlYY5PgHUckpjwl7dHiDqTSYypOH0rX8i0UF7
NlOpWIFK1/o3K5YzenJwLIm9tdhG+AJ4ZXUOL+ihoUp+uyac33jCAsg34n2spEgi
F/5V8SZ/oYImBeqZJBSf/eOllAVQMHoNyU3F7SILzQKCAQAul6+I5wKresnnnKF/
IvLNhLnLT+dO70ijZpK4VliUpxKIHMC9s+iOjJafJuog6QaRrftfVxMPald6tdzh
EkygsH2TKVfUXFKTtNBnCyat9RHYuvYOBJS2uq788F4hgLd/IIszSawctdHvppi9
vkudI3uEEQSAj7qu0EINH+sLYP2e/SQXHc+qFYEB9+A0u0357SQu3XB4XaMqfWac
LpF/DWr9dH3jlawFpta/ORWPLBG15Fm4Hw1+5lHx1ARHfL7iqpc8UbX2Jaj3yLdh
xnPXndjT8JQX1wbm4+jeouyheNovJEXa6enDERMFCD/GjnZ4VpORYk7IirQwZNwV
wGkJAoIBACYlU5gOAseC+bjHfzsvB3vKZ+clBNPKODehimPoaxhrnv3txeE9mC6Q
J+jHvvXXHeDbB6p2IDqt8naIfN8lkvhxjFPEzMOxiaBpjdRvQIeFt6Tjlnr0an0u
jT4pM0VbbaYaY5DZttTfRvHemw+IibJt6Hz2wPg6M5RYMUk+94HB8TmAMaT1mL2H
zaUjPNo8eeZQJBsphwPa6b4RO9+kxWuEwX/ZcALBq/o2DOfRGSktYMMHG6BozTMU
Xu0IymsvNo214e5URqZiWFW7jslBo64SvQ3HJuXw1oMNMSiMrS8992CHt3yI3Kbm
NtsfelZCpPJVnQzXnoErOJFUz1Y/8PUCggEASI2Uky/TDLCU5Inm2hN1zzozF/kh
pvKks4Kd93BSiMWDHg07HtfkGPBINQdwGScLuTK63uHEuVTB1e1VM+Kd+FqoddoA
xEfdFKoDMgyhE2KNg9iiePbkeXzCCY9yFD2xQ+zfMdqd/755Fg356IfETSx59WzO
vNTQShrwj5zOE1H3wl+YYsAWA1AiHslzwVp5pO26KhQLMO90q5j2g7gJEWOfhUDi
bkhJu3rXdJpiYOCngy0vKiSFYrvFfYh4hZ4+mL/TzWIuJtC0zmSetk1VwOYBkwJZ
Uo1bAaMdFJMC/QAWiJv95jGuusV39nT7pxd93sW1Iiv9OXnsxj8OeShEVg==
-----END RSA PRIVATE KEY-----
EOF

# clone the Google Cloud Source Repository
cd /tmp
ssh-agent bash -c 'ssh-add /tmp/ssh/id_rsa; git clone ssh://admin@pierrevillard.com@source.developers.google.com:2022/p/nifi-registry-245014/r/nifi-flow-repository'
chmod 755 /tmp/nifi-flow-repository
chown -R 1000:1000 /tmp/nifi-flow-repository

# Create the providers configuration for the NiFi Registry
# no user/password because we use SSH authentication
cat <<EOF > /tmp/providers.xml
<providers>
  <flowPersistenceProvider>
    <class>org.apache.nifi.registry.provider.flow.git.GitFlowPersistenceProvider</class>
    <property name="Flow Storage Directory">/nifi-flow-repository</property>
    <property name="Remote To Push">origin</property>
    <property name="Remote Access User"></property>
    <property name="Remote Access Password"></property>
  </flowPersistenceProvider>
  <extensionBundlePersistenceProvider>
    <class>org.apache.nifi.registry.provider.extension.FileSystemBundlePersistenceProvider</class>
    <property name="Extension Bundle Storage Directory">./extension_bundles</property>
  </extensionBundlePersistenceProvider>
</providers>
EOF
chown 1000:1000 /tmp/providers.xml

# .ssh/config and .ssh/known_hosts files
mkdir /tmp/config
cat <<EOF >> /tmp/config/config
Host source.developers.google.com
  HostName source.developers.google.com
  IdentityFile /id_rsa
EOF
ssh-keyscan -p 2022 source.developers.google.com >> /tmp/config/known_hosts
chown -R 1000:1000 /tmp/config

# change chmod on the private key to allow access to 'nifi' user inside container
chown 1000:1000 /tmp/ssh/id_rsa

Note 1 — we are using templates to get up and running very quickly each time you want to start a new instance with the same configuration.

Note 2 — the above approach is not recommended as we are copying/pasting the private key in the startup script but this is due to the restrictions coming with the Container Optimized OS used for this demo. In a better world, we would use Cloud Build to have our own NiFi Registry image and use it instead. Or we could deploy the public image on Google Kubernetes Engine and use secrets.

Once your template is created you can open it and click “Create VM”:

Then you can give a name to your instance (let’s say ‘nifi-registry’) and start it. You should have an instance up and running:

After configuring the proper Firewall rule to allow access from your personal network to the instance on the port 18080, you should be able to access the NiFi Registry at http://<external IP>:18080/nifi-registry :

You can go in Settings (top right) and create a new bucket:

You now have a NiFi Registry up and running and you have initialized you first bucket. We can now deploy a NiFi instance, connect it with the Registry and create out first workflow.

Start a standalone NiFi instance

It’s very easy! Just go in Compute Engine / VM instances and click “Create instance”. Then just give a proper name to your instance and configure it to use the NiFi Docker image:

Start your VM and wait for few minutes. After configuring the Firewall rule to allow access from your personal network, you should be able to access NiFi on port 8080:

Go into the top-right hamburger menu and go into Controller Settings. Then go into the Registry Clients tab and click the + button to configure your registry:

You can now add a Process Group into the canvas, right click on it and start versioning:

You will notice that we can see the bucket we created in the registry. We can give a name to our workflow, a description, and a commit for this version.

Once we click save, we have the confirmation that the workflow has been correctly versioned:

We can check in our Cloud Source Repository that we do have data:

That’s it. You can now create a more complex workflow and commit the new version into the Registry, this will be saved into your repository. Even better, if you kill your NiFi Registry instance, and start a new one, you will be able to keep working and pull all the workflows you previously stored in the repository — all the metadata will be generated from the repository data at startup.

There is much more to come about NiFi on Google Cloud, stay tuned! Thanks for reading and feel free to comment and/or ask questions.

Deploying NiFi workflows on Google Cloud Run

Google Cloud just launched a new managed service call Cloud Run which is currently in public beta.

Cloud Run is a managed compute platform that enables you to run stateless containers that are invocable via HTTP requests. Cloud Run is serverless: it abstracts away all infrastructure management, so you can focus on what matters most — building great applications. It is built from Knative and let you run your containers fully managed with Cloud Run.

The huge benefit of this approach is: if there is no call to your service, there won’t be any container running and you won’t be charged. When a call is received, the container is instantiated to serve the request. If there are new requests, the container remains alive until there is no more request to answer. Besides, in case of traffic increase, your app will scale and more containers will be instantiated.

Also, when deploying a new version (revision) of your service, requests are automatically routed as soon as possible to the latest healthy service revision. This makes the whole CI/CD and service lifecycle much easier.

I wanted to play with this new service using NiFi: the idea is to have a workflow starting with HandleHttpRequest processor and ending with HandleHttpResponse processor to easily develop “functions” and deploy the web services in Cloud Run.

Note – everything used for this demo is available on this Github repo.

The workflow

Here is a simple workflow I developed and deployed on Cloud Run: this is a workflow expecting CSV data as input and converting the CSV data into JSON.

I want to emphasize that this is a very simple case. But think about all the great features you can use in NiFi following this approach. Possible use cases are pretty much infinite.

Basically, once the workflow is running I can use a POST HTTP request to get my JSON:

$ cat test.csv
name,company
Pierre Villard,Google

$ curl --data-binary "@test.csv" -H "Content-Type: text/csv" -X POST http://localhost:9090/
[ {
  "name" : "Pierre Villard",
  "company" : "Google"
} ]

This is a very simple workflow:

  • HandleHttpRequest – will start listening on a given port for HTTP(S) requests
  • ConvertRecord – will convert my CSV data into JSON using the header to infer the schema
  • HandleHttpResponse – return the result to the user
Simple workflow to convert CSV to JSON using a HTTP endpoint

Considerations for the container

The container must be stateless and listen for HTTP requests on $PORT that will be passed to the container as an environment variable. Have a look at the container runtime contract.

In the NiFi world, that means that the property defining the listening port in the HandleHttpRequest processor must accept Expression Language. Unfortunately, that is not the case and I submitted a fix with NIFI-6144. However, in Cloud Run container instances, the PORT environment variable is always set to 8080 (but for portability reasons, your code should not hardcode this value). This means, you don’t strictly need the fix I submitted to play with Cloud Run and NiFi, but it’s safer if you do.

The container, when started, must be able to serve the request within 4 minutes which means that we want a container able to start as quick as possible and with a low footprint. To do that, I chose to work with the MiNiFi Docker image instead of the NiFi one.

Converting the workflow for MiNiFi

Once the workflow is developed, you can save it as a template (XML file) and use the MiNiFi toolkit to convert the XML file into a YAML file that will be useable by MiNiFi.

# command to convert the template into yml file for MiNiFi
$MINIFI_HOME/bin/config.sh transform templateConvertRecord.xml config.yml

You can have a look at the YAML file on the Github repository.

Building the container

By default, the MiNiFi Docker image comes with a limited set of features and it’s up to you to add the elements you need to run your workflow if required. In my case, I added:

  • a custom build of the standard processors NAR to include my fix,
  • the HTTP context NAR that is required when using HandleHttpRequest/Response processors,
  • and the Record serialization NAR required for the ConvertRecord processor.

Here is my Dockerfile:

FROM apache/nifi-minifi:latest
USER root

ENV MINIFI_HOME /opt/minifi/minifi-0.5.0

ADD config.yml $MINIFI_HOME/conf/config.yml
ADD *.nar $MINIFI_HOME/lib/

RUN chown -R minifi:minifi $MINIFI_HOME

USER minifi

You then need to create your image and push it to the Container Registry service in the Google Cloud Platform. You can have a look at the documentation here.

git clone git@github.com:pvillard31/nifi-cloudrun-example.git
cd nifi-cloudrun-example
gcloud auth configure-docker
# docker build -t [HOSTNAME]/[PROJECT-ID]/[IMAGE]:[TAG] .
docker build -t eu.gcr.io/my-gcp-project/minifi-csvjson:0.0.1 .
# docker push [HOSTNAME]/[PROJECT-ID]/[IMAGE]:[TAG] .
docker push eu.gcr.io/my-gcp-project/minifi-csvjson:0.0.1

Deploy in Cloud Run

Once your Docker image is pushed in the Container Registry, you can go to the Cloud Run service and deploy your service!

Click on Create Service and fill the information:

Create a service in Cloud Run

Once your service is deployed, you will get a service HTTPS URL exposing your service:

I can now directly send requests to the exposed service:

$ time curl --data-binary "@test.csv" -H "Content-Type: text/csv" -X POST https://minifi-csvjson-t546z2l6aq-uc.a.run.app
[ {
  "name" : "Pierre Villard",
  "company" : "Google"
} ]
real	0m0.725s
user	0m0.036s
sys	0m0.026s

And I can also monitor my service in Stackdriver:

Conclusion

That’s it! You know how to deploy a new service in Cloud Run using the features of NiFi to develop your workflow and using the MiNiFi Docker image for running your containers.

Hopefully this demonstrates how easy it is to create and deploy a service and everything is managed for you in terms of scaling and lifecycle… and you’re paying only for what you really use!

In combination with the NiFi Registry and all the tools you have on the Google Cloud Platform you can have a very neat pipeline to automate all of it and deploy new services in seconds!

As always, feel free to ask questions and/or leave a comment.

NiFi 1.8+ – Revolutionizing the List/Fetch pattern and more…

If you read my post about List/Fetch pattern and if you’re using this approach for some of your workflows, this new feature coming with NiFi 1.8.0 is going to be a revolution.

A quick recap about the context: in NiFi, unless you specifically do something to make the nodes of a NiFi cluster exchange data, a flow file will remain on the same node from the beginning to the end of the workflow. For some use cases, it is necessary to load-balance the flow files among the nodes of the cluster to take advantage of all the nodes (like the List/Fetch pattern).

The only way to load balance data in a NiFi cluster before NiFi 1.8 is to use the Site-to-Site (S2S) protocol in NiFi with a Remote Process Group (RPG) connected to the cluster itself with an input/output port. In addition to that the S2S forces you to have the input/output port defined at the root level of the canvas.  In a multi-tenant environment this can be a bit annoying and can make the workflows a little bit more complex.

What’s the revolution with NiFi 1.8+? For intra-cluster load balancing of the flow files, you now can do it directly by configuring it on the relationship between two processors. It could sound like a minor thing, but that’s HUGE! In addition to that, you have few options to configure the load-balancing which opens up new possibilities for new use cases!

The List/Fetch pattern is described in my previous post: in short… it’s the action to use a first processor (ListX) only running on the primary node to list the available data on X and generate one flow file per object to retrieve on X (the flow file does not have any content but contains the metadata to be used to fetch the object), then flow files are distributed among the NiFi nodes and then the FetchX processor running on all nodes will take care of actually retrieving the data. This way you ensure there is no concurrent access to the same object and you distribute the work in your cluster.

List/Fetch pattern before NiFi 1.8.0

If we have a project A retrieving data from a FTP server using the List/Fetch pattern to push the data into HDFS, it’d look like this:

  • Root Process Group level

Screen Shot 2018-10-18 at 10.03.23 PM.png

  • Inside the Process Group dedicated to Project A

Screen Shot 2018-10-18 at 10.04.01 PM.png

The ListFTP is running on the primary node and sends the data to the RPG which load balances the flow files among the nodes. Flow files are pushed to the input port at the root level and the data can then be moved back down to the process group of the project. Then the FetchFTP actually retrieves the data and the data is sent to HDFS.

List/Fetch pattern with NiFi 1.8+

Now… it looks like this:

  • Root Process Group level

Screen Shot 2018-10-18 at 10.09.29 PM

  • Inside the Process Group dedicated to Project A

Screen Shot 2018-10-18 at 10.10.30 PM

It’s crazy, no? You don’t have anything outside of the process group anymore, the workflow is cleaner/simpler, and authorizations are much easier to manage.

Where is the trick? Did you notice the small icon on the relationship between the ListFTP and the FetchFTP? It looks small but it’s HUGE :).

Load balancing strategies in NiFi 1.8+

Let’s have a look at the configuration of a connection:

Screen Shot 2018-10-18 at 10.18.24 PM.png

There is a new parameter available: the load balance strategy. By default it defaults to “do not load balance” and, unless you need to, you won’t change that parameter (you don’t want to move data between your nodes at each step of the workflow if there is no reason to do so).

Here are the available strategies:

Screen Shot 2018-10-18 at 10.20.24 PM

The Round robin strategy is the one you would probably use in a List/Fetch use case. It will ensure your data is evenly balanced between your nodes.

The Single node strategy allows you to send the data back to one single node. You can see it a bit like a reducer in a MapReduce job: you process the data on all the nodes and then you want to perform a step on a single node. One example could be: I have a zip file containing hundreds of files, I unzip the file on one node, load balance all the flow files (using Round Robin strategy for example) among the nodes, process the files on all the nodes and then send back the flow files to a single node to compress back the data into a single file. It could look like this:

Screen Shot 2018-10-18 at 10.31.08 PM.png

Then you have the Partition by attribute strategy allowing you to have all the flow files sharing the same value for an attribute to be sent on the same node. For example, let’s say you receive data on NiFi behind a load balancer, you might want to have all the data coming from a given group of sources on the same node.

I won’t go into much more details, but feel free to have a look at the documentation. Besides, I’m sure other members of the Apache NiFi community will publish blog posts on this subject… such as this excellent one!

Let’s just give it a try with each strategy… I’m using a GenerateFlowFile (GFF) connected to an UpdateAttribute and we will list the flow files queuing in the relationship to check where the flow files are located. Besides, the GenerateFlowFile is configured to set a random integer between 2 and 4 for the attribute ‘filename’.

Let’s start with a GFF running on the primary node only with no load balancing strategy:

Screen Shot 2018-10-18 at 10.44.06 PM.png

My primary node being my node2 (I have node2, node3 and node4 in my cluster):

Screen Shot 2018-10-18 at 10.45.05 PM.png

I can confirm all the flow files are on the primary node:

Screen Shot 2018-10-18 at 10.46.37 PM.png

Let’s change the Load Balance strategy to Round Robin. We can confirm the data is evenly distributed:

Screen Shot 2018-10-18 at 10.50.02 PM

Let’s change the strategy to One single node. We can confirm the data is now back to a single node (not necessarily the primary node):

Screen Shot 2018-10-18 at 10.53.22 PM.png

And now let’s try the partitioning by attribute using the ‘filename’ attribute. We can confirm that all the flow files sharing the same value in the Filename column are on the same node:

Screen Shot 2018-10-18 at 10.55.21 PM.png

Screen Shot 2018-10-18 at 10.56.12 PM.png

Again, I expect to see additional blog posts on this subject with more technical insights on how it actually works and what you should consider in case your cluster is scaling up and down.

Also… this new feature comes with another one which is as much exciting: the offloading of nodes in a cluster (look at the Admin guide in the documentation). In case you want to decommission a node in a cluster, this will take care of getting back the data on the other nodes before you actually remove the node for good. This will be particularly useful when deploying NiFi on Kubernetes and scaling up and down your cluster!

As always, feel free to comment and/or ask questions! Thanks for reading!

Monitoring Driven Development with NiFi 1.7+

I already discussed the Monitoring Driven Development (MDD) approach in this post and already covered some monitoring topics around NiFi in few posts (reading the posts would probably help you for reading this one – or at least refresh your memory on some concepts). In this post I want to present once again the MDD approach with some new features of NiFi 1.7 making things a bit easier and cleaner.

What do I mean when talking about MDD in NiFi? When I say MDD, I refer to the fact that, when a team is developing a “core workflow” for a use case, the team should also develop a “monitoring workflow” that will be in charge of monitoring and reporting the health of the “core workflow” based on their business requirements.

How does this work in NiFi? This approach completely relies on two things:

  • A proper naming convention for the components used in the workflows
  • The use of the Site-to-Site Reporting Tasks (see here)

Basically, the idea is to setup the S2S reporting tasks to send data to an input port dedicated to monitoring, then basic routing and filtering is performed assuming projects are following the naming convention so that all the data related to a given “core workflow” is automatically routed to the associated “monitoring workflow”. This way, it’s up to each project to implement the monitoring they want for their use case based on the requirements and SLA they have.

What’s new in NiFi that’s going to help me with MDD?

There are quite few interesting JIRAs but the two most important ones are:

  • [NIFI-4814] – Add distinctive attribute to S2S reporting tasks (NiFi 1.6.0)
  • [NIFI-5122] – Add record writer to S2S Reporting Tasks (NiFi 1.7.0)

Example

For my example, the naming convention is the following: any component that should be monitored should be named with a prefix ABC_ where ABC is the three letters abbreviation representing the use case/project.

I have two projects, one named PJA and one named PJB.

Let’s say the project A is using HandleHttpRequest/Response to receive XML data over HTTP and has two monitoring requirements:

  • Send an email to the team in case a bulletin is generated
  • Monitor in Grafana the amount of valid/invalid data being processed

Let’s say the project B is using ListenTCPRecord for a use case around logs ingestion and has one monitoring requirement:

  • Send an email to the team in case back pressure is enabled

The development of the workflows is outside the scope of this post. I’ll keep things simple as the objective is to demonstrate the MDD approach, not how to implement the workflows of the specific use cases I chose.

A *very basic* implementation of the “core workflow” of PJA could be:

Screen Shot 2018-08-29 at 3.55.19 PM.png

A *very basic* implementation of the “core workflow” of PJB could be:

Screen Shot 2018-08-29 at 4.36.29 PM

Now I’m going to configure the S2S Reporting Tasks – one for Controller Status data and one for Bulletins data. Note that there is a new feature allowing users to select a Record Writer for the generated data. I’m going to use the Avro one for the reporting tasks so that I don’t have to care about the schemas at all. For that I need to create a Controller Service (in Controller Settings menu since this controller service is exclusively used by reporting tasks).

Screen Shot 2018-08-29 at 5.02.26 PM.png

After enabling the CS, I can configure my reporting tasks.

The one for bulletins:

Screen Shot 2018-08-29 at 5.03.59 PM

And the one for controller status:

Screen Shot 2018-08-29 at 5.05.21 PM

Note that you have additional S2S Reporting Tasks if needed. The one about provenance events can be particularly useful for some monitoring requirements such as latency.

At the root level of my canvas, I’ve something looking like:

Screen Shot 2018-08-29 at 5.45.38 PM.png

I’m receiving all the monitoring data sent by the reporting tasks in the ‘monitoring’ input port. Then in my ‘Monitoring’ process group, I have something looking like:

Screen Shot 2018-08-29 at 5.46.48 PM.png

You can notice I’m sending all the data to a ‘Technical Monitoring’. That’s because, the team in charge of administrating/monitoring the platform probably wants to also have a look at the monitoring data to ensure the full system is performing OK (common examples: send the bulletins to an external tool like ES/Kibana, send email alerts in case back pressure is enabled on a connection, etc).

On the upper part of the workflow, that’s where I’m processing the monitoring data to split it and route the monitoring data by project. First I’m using a RouteOnAttribute to route the data based on the type (bulletins, controller status, etc):

Screen Shot 2018-08-29 at 5.50.48 PM

Then I’m using an UpdateRecord processor to add a ‘project’ field that I’m computing based on the component name.

For bulletins:

Screen Shot 2018-08-29 at 5.55.34 PM

For controller status:

Screen Shot 2018-08-29 at 5.52.43 PM.png

Since the data has been sent in Avro format by the reporting task, I’m using an Avro reader Controller Service with default settings. And, from now on, I want the data to be in JSON (as it’ll be easier in case I want to send the data via an email or via HTTP). I’m configuring few controller services: one Avro Schema Registry containing the schemas (that you can retrieve in the additional details of the reporting task documentation, see below), one JSON Reader and one JSON Writer.

For instance, to get the schema of the S2S Bulletin Reporting Task, go on the reporting task list:

Screen Shot 2018-08-29 at 5.59.40 PM.png

Click on the documentation icon on the left:

Screen Shot 2018-08-29 at 6.00.21 PM.png

And then click Additional Details:

Screen Shot 2018-08-29 at 6.00.58 PM.png

When adding the schemas in the Avro Schema Registry, use the type of the reporting task as the name of the schema and don’t forget to add a ‘project’ field in the schemas:

Screen Shot 2018-08-29 at 6.02.28 PM.png

Now you can easily reference the schemas to use in the configuration of the JSON Reader/Writer using the expression language because the flow files generated by the reporting tasks have an attribute with the type of the source reporting task.

Example of attributes on a flow file generated by a RT:

Screen Shot 2018-08-29 at 6.28.56 PM.png

JSON Reader:

Screen Shot 2018-08-29 at 6.03.44 PM.png

JSON Writer:

Screen Shot 2018-08-29 at 6.04.13 PM.png

Now I’ve a ‘project’ field in my data with the project name (if the naming convention has been correctly used), then you can use a QueryRecord processor to route the data for each project:

Screen Shot 2018-08-29 at 6.07.17 PM.png

And we can confirm that only the data for project A is routed to the ‘Monitoring Workflow’ for project A. If looking at the data in the relationship routing data for PJA:

Screen Shot 2018-08-29 at 6.08.53 PM.png

Now, when deploying a new project for the first time (ie importing the process group of the “core workflow”), you just need to update the QueryRecord processor to add a dynamic property for the new project as well as deploying the process group for the “monitoring workflow” of the project. In combination with the NiFi Registry it makes things really smooth when you need to update the workflows with a new version.

I’m not going to develop the monitoring workflows for PJA and PJB but it’d only be a matter of few record processors to filter the data (bulletin? back pressure enabled?), and then use a processor to send the data (ElasticSearch, PutEmail, InvokeHttp, etc):

  • “send an email when a bulletin is generated” – RouteOnAttribute to get the bulletins data and PutEmail to send an email
  • “monitor the number of valid/invalid XML files being processed” – filter the controller status statistics on the ‘Connection’ elements and keep only the statistics of the connections after the ValidateRecord processor, this can be easily done with a QueryRecord processor. The numbers can then be sent to an external system such as ElasticSearch for a dashboard in Kibana/Grafana
  • “send an email when back pressure is enabled” – use a QueryRecord processor on the controller status data by querying the field ‘isBackPressureEnabled’ and then use PutEmail processor.

The full workflow of this example is available as a gist here. Note that, since it’s a template, it does not contain the Reporting Tasks configuration. But you should be up and running quite quickly with the above screenshots.

As always, thanks for reading this post, feel free to comment and/or ask questions. In case you have suggestions to make things even better, just let me know and, maybe, I’ll have to publish a new version of this post to present the new features of a next release 😉