NiFi with OIDC using Terraform on the Google Cloud Platform

When I present Apache NiFi during talks or meetings, I have to quickly start and stop instances. It’s very easy to do it on your own laptop with Docker, but it’s even better to have it running in the cloud and use IAC (Infrastructure As Code).

It’s very easy to start Apache NiFi on the Google Cloud Platform in a Compute instance, expose it on the Internet and have everything running. It just takes two commands and few seconds… Go in your GCP project, start the Cloud Shell console and run the two below commands:

gcloud beta compute instances create-with-container my-nifi-instance --tags=nifi --container-image=apache/nifi
gcloud compute firewall-rules create allow-nifi-unsecured --action=ALLOW --rules=tcp:8080 --target-tags=nifi

You just started a Compute instance with the latest version of Apache NiFi and exposed it to anyone on the internet. You just need to get the external IP of your instance and you can access the UI on http://external_ip:8080/nifi.

It’s great but you need to understand that your instance is not secured and exposed to anyone. In short… you should never do that. At the very least, get your IP and restrict the access to the instance to your own IP.

But please… Security must be a first class citizen and the Apache NiFi community is really doing an amazing job to give you the best options to secure your instances.


In this post I show you how to use Terraform to start a secured NiFi instance configured to use OpenID Connect (OIDC) for authentication.

Note — I assume you have a domain that you own (you can get one with Google). It will be used to map a domain to the web interface exposed by NiFi. In this post, I use my own domain: pierrevillard.com and will map nifi.pierrevillard.com to my NiFi instance.

Disclaimer — the below steps should not be used for a production instance, I’m just using the below to start a secured instance with a single user access for short demos (there is no configuration that one would expect for a production or long-lived instance).


OAuth Credentials

First step is to create the OAuth Credentials (at this moment, this cannot be done using Terraform). 

Once the credentials are created, you will get a client ID and a client secret that you will need in the Terraform variables.

By creating the credentials, your domain will be automatically added to the list of the “Authorized domains” in the OAuth consent screen configuration. It protects you and your users by ensuring that OAuth authentication is only coming from authorized domains.


Deploy NiFi with Terraform

I’ll go a bit deeper of what I’m doing in the next parts, below are just commands to deploy everything. Go in your GCP Project, and start the Cloud Shell console.

git clone https://github.com/pvillard31/nifi-gcp-terraform.git
cd nifi-gcp-terraform/gcp-single-secured-nifi-oidc/
terraform init
terraform apply

When applying the Terraform configuration, it’ll ask for some information:

  • The token to be used between the NiFi CA and the NiFi instance to generate certificates. You can use a random string which is at least 16 bytes long.
  • The Google email address of the user that will be the initial admin for the NiFi instance.
  • The OAuth Client ID and Secret you got before.
  • And the sub-domain that you will configure and use to access your NiFi instance. In my case: nifi.pierrevillard.com.

Access NiFi

Once the Terraform configuration is applied. You need to map your subdomain to the static IP of the NiFi instance. Go on your GCP Project, on the Compute Engine page and get the external IP of the NiFi instance:

Once you have the external IP, go to your DNS provider page and add a ‘A’ record to your DNS records with the subdomain pointing to the external IP. The exact steps depend on your DNS provider.

Once done, you should be able to access the NiFi UI using your subdomain on the port 8443: https://nifi.pierrevillard.com:8443/nifi.

You will most probably get a warning from your browser because of the untrusted certificate authority. That’s because we generated a CA certificate to sign the NiFi certificate. You can ignore the warning for a demo but otherwise you should use a trusted CA certificate:

Once you proceed to the website, you’ll be redirected to the Google authentication page asking for your credentials. That’s because we configured NiFi to use OpenID Connect to delegate the authentication to Google. At this point, you can only authenticate using the Google address you provided as initial admin for NiFi:

Then you are connected as the user and can access the canvas:

Before being able to design you first workflow, you’ll need to go to the “Policies” menu to grant you the required permissions. You can also go to the “Users” menu to add additional users that will be able to authenticate on the UI and give them the appropriate permissions. 


Details

The Terraform configuration files are on Github:

  • provider.tf to define the Google Cloud provider with the basic GCP project information
  • network.tf to create a network and subnetwork dedicated to the NiFi CA instance and the NiFi instance
  • firewall.tf to create the rules to allow internal communications, SSH access to the instances and access to the NiFi instance on the port 8443
  • nifi-ca.tf to install the NiFi Certificate Authority (provided with the TLS toolkit) in server mode in order to create a certificate authority and sign the certificates for the NiFi instance
  • nifi.tf to install the NiFi instance, get the certificate from the NiFi CA, generate the keystore and truststore and configure the NiFi instance to be secured and use OpenID Connect for authentication
  • variables.tf to define some variables to be used to customize the deployment (more variables could be added)

Remember, this is a basic deployment of NiFi but you have a secured instance with Google delegated authentication.

Note — To delete all the created resources, you can use ‘terraform destroy’.

There is much more to do to get closer to a production ready deployment but it gets you started to play with NiFi on the Google Cloud Platform. I’ll add more features in my next posts (the immediate next step will be to add a secured NiFi Registry instance that is connected to the Google Cloud Source Repositories).

Thanks for reading, feel free to ask questions or comment this post!

NiFi & NiFi Registry on the Google Cloud Platform with Cloud Source Repositories


This post is about quickly and easily deploying an unsecured instance of NiFi and an unsecured instance of the NiFi Registry which uses the Cloud Source Repositories service as backend for the flow persistence provider.

The objective is to quickly deploy NiFi and the NiFi Registry, connect the two together, version the workflows in the Source Repositories, and be up and running quickly to start building workflows. This is not suitable for production deployment as we are not securing the instances (I’ll talk about that in another post).

Also this story is about a new feature in NiFi Registry 0.4.0 (NIFIREG-209) which allows the NiFi Registry to rebuild all the metadata from an existing Git repository of flows. It’s a very nice feature when you start and stop NiFi instances on the fly while also having access to your versioned flows very easily. Actually, using this feature, we could run the NiFi Registry in Google Cloud Run and have the production instances of NiFi just pulling the versions of the flows from the NiFi Registry exposed by Google Cloud Run. By doing that you would leverage the advantages of serverless. If you are interested by Google Cloud Run, you might be interested about this post for running NiFi workflows in Cloud Run.


Setup Source Repository

I start creating a fresh new project in my Google Cloud Platform console. I call this new project ‘nifi-registry’. Once the project is created, I go into Source Repositories. If it’s your first time, click on ‘Get started’ and ‘Create repository’.

Source Repositories is the Google Cloud offer to get free unlimited private Git repositories to organize your code in a way that works best for you (you can also mirror code from GitHub or Bitbucket repositories to get powerful code search, code browsing, and diagnostics capabilities). It also nicely integrates with CI/CD tools.

In my case, I create a new repository that I call ‘nifi-flow-repository’.

Let’s now setup the SSH key to allow access to the repository.

$ ssh-keygen -t rsa -b 4096 -m PEM -C "NiFi Registry"

Generating public/private rsa key pair.
Enter file in which to save the key (~/.ssh/id_rsa): ~/temp/id_rsa
Enter passphrase (empty for no passphrase): 
Enter same passphrase again: 
Your identification has been saved in ~/temp/id_rsa.
Your public key has been saved in ~/temp/id_rsa.pub.
The key fingerprint is:
SHA256:/hu6FZLvcvDigP4ixPFPEGzfpY1RHOoTRytjqYOoYHo NiFi Registry
The key's randomart image is:
+---[RSA 4096]----+
|   .     o+.     |
|    +   .+o.     |
|   . o .B*o      |
|  .....++*.      |
|.o.o..o S .      |
|+.o ...o.+ .     |
|o.E .o. .o+      |
| .... ..o=o.     |
|   ..o..+=+.     |
+----[SHA256]-----+


$ ls
id_rsa     id_rsa.pub

For this demo to work, we generate a PEM encoded key and we use an empty passphrase (again, this is not ideal for production). Once done, you can register the SSH key with Google Cloud (there is a link available after you created the repository). You just have to give a name to the key and copy the content of the generated id_rsa.pub file.

Start the NiFi Registry

We can now focus on starting the NiFi Registry. To be up and running very quickly, I’m going to rely on the Docker image provided by Apache NiFi and use it in a simple Compute Engine instance with Docker enabled.

In Compute Engine / Instance templates, you can create a new template. Here is my setup with the parameters I changed (adapt it to your needs):

  • Name : nifi-registry
  • Check “Deploy a container image to this VM instance”
  • Container image : apache/nifi-registry
  • Go into “Advanced container options”
  • In the volume mounts (use Directory as volume type) :
  • Go into “Management, security, disks, networking, sole tenancy”
  • Add the below startup script :
#! /bin/bash

# This script is used when starting a docker image based GCE instance
# of the NiFi Registry. It is intended to configure the NiFi Registry
# so that the persistence provider is the Google Cloud Source Repo.

# Docker volumes (directory type)
# /tmp/config => /home/nifi/.ssh - Read only
# /tmp/ssh/id_rsa => /id_rsa - Read only
# /tmp/nifi-flow-repository => /nifi-flow-repository - Read/Write
# /tmp/providers.xml => /opt/nifi-registry/nifi-registry-0.4.0/conf/providers.xml - Read only

# Note that 1000 is uid/gid for NiFi user/group in the Docker container

# create directory for SSH keys
mkdir /tmp/ssh

# private SSH key to authenticate againt the Google Cloud Source Repo
# its associated public key needs to be registered on Source Repo
# this is the content of the id_rsa file we generated, change it with yours!
touch /tmp/ssh/id_rsa && chmod 600 /tmp/ssh/id_rsa
cat <<EOF &gt;&gt; /tmp/ssh/id_rsa
-----BEGIN RSA PRIVATE KEY-----
MIIJJwIBAAKCAgEAxRVsCHEUnKJXZnJ8GZVb/D0bvCrzK/p8jYiuleUqCgrqJ+2D
hocKkLqPKU5yiiA11vDCgrn7GBkWUH+Naj6rc9qZviwJtuZFjfKt+gg9++JLsnQn
8TdfbZ7nKZ2fh397Yr50mLP9wJw6A0nd68Y9YowRID64ZJ+kfqtnr5gaU5wI6/j+
UEV8QO2hpQfJ+hI0TUe82kY3l21J4FcLGbKsZSwMjRDRIG9fly4adEVsPq+WZFJ1
6q8ZsGfLpArCORQiM81mBFTeeLn65FZwDEsAIko3Fv+OVBjv+YtpaUOL4QwM9E3X
9v+0WqkKG0MYm12FjHkulIdHbO9gVIXKM8sIyI+J49fxWiLnb+HiYExCvdgaKCAf
eR12zIjTXHrX8YHF5uxfE0oCMgPcZIcsSVO0zjNNKniJy4AWU46hxB8USI7Kf9jD
i/YVRmDi67mXSi6z8FjamccIWg8CnWVkLTr771ERZATZAn7YDY7Zu8Z+AXE6U12o
7hkcB940B+hHnHrB32iKiOev+NLR+NCde1YUBHfaw6vmmsBwrmO9H/cSGz9L1aC/
seSOBrnWef/YZXAXtMb1maSGpVEW+74uR05UL5Bt5xlB+MAQGRcDWkMJ08teqYGZ
IOgqqeESihXq2Suk1+nMhO/kUHdUBHVHXBmQTqtRMKjMyyRaM/rU/BwF0PECAwEA
AQKCAgBEFI1YMS/sl8vXUO77q3O2I6nkC3YrGAFUpwWgNCSciX6vwkVwAFAvBLMV
ksrQWiYfFNYscHCDt47Uyesg63swry/y2KWWb99RFKbyu0wmKdr6T1PI6NbnOEAk
SRXlqa0GxEHkyjB7C7yijC7EFpv34ei8mc36vIcHVtCfgMx/W2Rdl4rKUeaFS1FO
f+1OnGFC3OgpAp6Lia4+d/MWsnkJDflb9ZY4PSDpSXzL83tcAC3UhAPFySz3mjNn
fGvxaboS6P43utWLILvBivZz2Ki4L6juIeOZu2+SZY1JVpMIb7A96HIVGenEc6ZR
GpFyghDzIJ1to3mR8PL236ykzZs/iRbRGOvHSHNFiAmKMIXqeYe+Q6u/9mGVo68J
Vbw0PeTWaaPnnBM3iVi2d3STGsI6x2bTtosDyRS5YLNztj0Y8mOSWQz8JvnTcikM
lKBcSbA3K/ophzSyjk8sN65HqM1JQ0v/bK6w8TtZBLThPwcqRtbvYQBCQC7g74Oe
5KAuJRDTKDSxkLlD9AX0hdMIMIVKNd5h8Sx7vlX6DZ/5PdLT87eIS39mQlXNjE6W
kvGPte1YlTqWFECmxHlxvSEMB6txEzUrfEVLpTdKkpnygQFJfZdKzTiIsFiqKn9s
s6IAuzyHBJ6MMUdstZ8DMMnE/uXWU3ouUemSLcRiPMXi5AlZkQKCAQEA+EuQ75un
9PS/D6pCcOrZaodK0J2Xz4KX2VkuEJeP/KmDAFwcB0HzpQO6zQglIf8Sv9r3/g10
qk6pt5Y+gqBnpQOV/uUgN7U0lTMAnxx4l3RxXKZW3cqEYL326ouzRKDjz7TxYiWK
T+RT4QlK0jm7NfqZ8trqwi4dB4Nz5JA4MItoSinyWYzRfnaNH8jBxYmRyDGWQLsq
y2Dep6AZ2GPez473xl6TdwTc47UnDYbyuXdhK4yV+Jr9quYUkda+gFVlh7ZEK/vi
WsBoh0IjG07C4m8rNdxHnfplH9MIFo+I2u+hk6Ot4hpTc9b+Vj93FidSnboOEjDh
YD6MM1A8gVJdtQKCAQEAyzMJVHOAWq0F14h6vUY9m8lkw3KBZorfTsDD39kG3P7C
lh/VCCqk7Uz6drkPc6Gb/xp1LgCDhdxxNISos/92IINFwTmg1bQpaCBsfgkM+r2g
S87hUdCL3NCLwsrS67oQjalEAVu8TZ2obCZ48EgasiY48tq7SrQ7+tW7PSjnO5pv
pM661m5Gmp0/y7+0CkPg2JyGp4mwlYY5PgHUckpjwl7dHiDqTSYypOH0rX8i0UF7
NlOpWIFK1/o3K5YzenJwLIm9tdhG+AJ4ZXUOL+ihoUp+uyac33jCAsg34n2spEgi
F/5V8SZ/oYImBeqZJBSf/eOllAVQMHoNyU3F7SILzQKCAQAul6+I5wKresnnnKF/
IvLNhLnLT+dO70ijZpK4VliUpxKIHMC9s+iOjJafJuog6QaRrftfVxMPald6tdzh
EkygsH2TKVfUXFKTtNBnCyat9RHYuvYOBJS2uq788F4hgLd/IIszSawctdHvppi9
vkudI3uEEQSAj7qu0EINH+sLYP2e/SQXHc+qFYEB9+A0u0357SQu3XB4XaMqfWac
LpF/DWr9dH3jlawFpta/ORWPLBG15Fm4Hw1+5lHx1ARHfL7iqpc8UbX2Jaj3yLdh
xnPXndjT8JQX1wbm4+jeouyheNovJEXa6enDERMFCD/GjnZ4VpORYk7IirQwZNwV
wGkJAoIBACYlU5gOAseC+bjHfzsvB3vKZ+clBNPKODehimPoaxhrnv3txeE9mC6Q
J+jHvvXXHeDbB6p2IDqt8naIfN8lkvhxjFPEzMOxiaBpjdRvQIeFt6Tjlnr0an0u
jT4pM0VbbaYaY5DZttTfRvHemw+IibJt6Hz2wPg6M5RYMUk+94HB8TmAMaT1mL2H
zaUjPNo8eeZQJBsphwPa6b4RO9+kxWuEwX/ZcALBq/o2DOfRGSktYMMHG6BozTMU
Xu0IymsvNo214e5URqZiWFW7jslBo64SvQ3HJuXw1oMNMSiMrS8992CHt3yI3Kbm
NtsfelZCpPJVnQzXnoErOJFUz1Y/8PUCggEASI2Uky/TDLCU5Inm2hN1zzozF/kh
pvKks4Kd93BSiMWDHg07HtfkGPBINQdwGScLuTK63uHEuVTB1e1VM+Kd+FqoddoA
xEfdFKoDMgyhE2KNg9iiePbkeXzCCY9yFD2xQ+zfMdqd/755Fg356IfETSx59WzO
vNTQShrwj5zOE1H3wl+YYsAWA1AiHslzwVp5pO26KhQLMO90q5j2g7gJEWOfhUDi
bkhJu3rXdJpiYOCngy0vKiSFYrvFfYh4hZ4+mL/TzWIuJtC0zmSetk1VwOYBkwJZ
Uo1bAaMdFJMC/QAWiJv95jGuusV39nT7pxd93sW1Iiv9OXnsxj8OeShEVg==
-----END RSA PRIVATE KEY-----
EOF

# clone the Google Cloud Source Repository
cd /tmp
ssh-agent bash -c 'ssh-add /tmp/ssh/id_rsa; git clone ssh://admin@pierrevillard.com@source.developers.google.com:2022/p/nifi-registry-245014/r/nifi-flow-repository'
chmod 755 /tmp/nifi-flow-repository
chown -R 1000:1000 /tmp/nifi-flow-repository

# Create the providers configuration for the NiFi Registry
# no user/password because we use SSH authentication
cat <<EOF &gt; /tmp/providers.xml
<providers&gt;
  <flowPersistenceProvider&gt;
    <class&gt;org.apache.nifi.registry.provider.flow.git.GitFlowPersistenceProvider</class&gt;
    <property name="Flow Storage Directory"&gt;/nifi-flow-repository</property&gt;
    <property name="Remote To Push"&gt;origin</property&gt;
    <property name="Remote Access User"&gt;</property&gt;
    <property name="Remote Access Password"&gt;</property&gt;
  </flowPersistenceProvider&gt;
  <extensionBundlePersistenceProvider&gt;
    <class&gt;org.apache.nifi.registry.provider.extension.FileSystemBundlePersistenceProvider</class&gt;
    <property name="Extension Bundle Storage Directory"&gt;./extension_bundles</property&gt;
  </extensionBundlePersistenceProvider&gt;
</providers&gt;
EOF
chown 1000:1000 /tmp/providers.xml

# .ssh/config and .ssh/known_hosts files
mkdir /tmp/config
cat <<EOF &gt;&gt; /tmp/config/config
Host source.developers.google.com
  HostName source.developers.google.com
  IdentityFile /id_rsa
EOF
ssh-keyscan -p 2022 source.developers.google.com &gt;&gt; /tmp/config/known_hosts
chown -R 1000:1000 /tmp/config

# change chmod on the private key to allow access to 'nifi' user inside container
chown 1000:1000 /tmp/ssh/id_rsa

Note 1 — we are using templates to get up and running very quickly each time you want to start a new instance with the same configuration.

Note 2 — the above approach is not recommended as we are copying/pasting the private key in the startup script but this is due to the restrictions coming with the Container Optimized OS used for this demo. In a better world, we would use Cloud Build to have our own NiFi Registry image and use it instead. Or we could deploy the public image on Google Kubernetes Engine and use secrets.

Once your template is created you can open it and click “Create VM”:

Then you can give a name to your instance (let’s say ‘nifi-registry’) and start it. You should have an instance up and running:

After configuring the proper Firewall rule to allow access from your personal network to the instance on the port 18080, you should be able to access the NiFi Registry at http://<external IP>:18080/nifi-registry :

You can go in Settings (top right) and create a new bucket:

You now have a NiFi Registry up and running and you have initialized you first bucket. We can now deploy a NiFi instance, connect it with the Registry and create out first workflow.

Start a standalone NiFi instance

It’s very easy! Just go in Compute Engine / VM instances and click “Create instance”. Then just give a proper name to your instance and configure it to use the NiFi Docker image:

Start your VM and wait for few minutes. After configuring the Firewall rule to allow access from your personal network, you should be able to access NiFi on port 8080:

Go into the top-right hamburger menu and go into Controller Settings. Then go into the Registry Clients tab and click the + button to configure your registry:

You can now add a Process Group into the canvas, right click on it and start versioning:

You will notice that we can see the bucket we created in the registry. We can give a name to our workflow, a description, and a commit for this version.

Once we click save, we have the confirmation that the workflow has been correctly versioned:

We can check in our Cloud Source Repository that we do have data:

That’s it. You can now create a more complex workflow and commit the new version into the Registry, this will be saved into your repository. Even better, if you kill your NiFi Registry instance, and start a new one, you will be able to keep working and pull all the workflows you previously stored in the repository — all the metadata will be generated from the repository data at startup.

There is much more to come about NiFi on Google Cloud, stay tuned! Thanks for reading and feel free to comment and/or ask questions.

Deploying NiFi workflows on Google Cloud Run

Google Cloud just launched a new managed service call Cloud Run which is currently in public beta.

Cloud Run is a managed compute platform that enables you to run stateless containers that are invocable via HTTP requests. Cloud Run is serverless: it abstracts away all infrastructure management, so you can focus on what matters most — building great applications. It is built from Knative and let you run your containers fully managed with Cloud Run.

The huge benefit of this approach is: if there is no call to your service, there won’t be any container running and you won’t be charged. When a call is received, the container is instantiated to serve the request. If there are new requests, the container remains alive until there is no more request to answer. Besides, in case of traffic increase, your app will scale and more containers will be instantiated.

Also, when deploying a new version (revision) of your service, requests are automatically routed as soon as possible to the latest healthy service revision. This makes the whole CI/CD and service lifecycle much easier.

I wanted to play with this new service using NiFi: the idea is to have a workflow starting with HandleHttpRequest processor and ending with HandleHttpResponse processor to easily develop “functions” and deploy the web services in Cloud Run.

Note – everything used for this demo is available on this Github repo.

The workflow

Here is a simple workflow I developed and deployed on Cloud Run: this is a workflow expecting CSV data as input and converting the CSV data into JSON.

I want to emphasize that this is a very simple case. But think about all the great features you can use in NiFi following this approach. Possible use cases are pretty much infinite.

Basically, once the workflow is running I can use a POST HTTP request to get my JSON:

$ cat test.csv
name,company
Pierre Villard,Google

$ curl --data-binary "@test.csv" -H "Content-Type: text/csv" -X POST http://localhost:9090/
[ {
  "name" : "Pierre Villard",
  "company" : "Google"
} ]

This is a very simple workflow:

  • HandleHttpRequest – will start listening on a given port for HTTP(S) requests
  • ConvertRecord – will convert my CSV data into JSON using the header to infer the schema
  • HandleHttpResponse – return the result to the user
Simple workflow to convert CSV to JSON using a HTTP endpoint

Considerations for the container

The container must be stateless and listen for HTTP requests on $PORT that will be passed to the container as an environment variable. Have a look at the container runtime contract.

In the NiFi world, that means that the property defining the listening port in the HandleHttpRequest processor must accept Expression Language. Unfortunately, that is not the case and I submitted a fix with NIFI-6144. However, in Cloud Run container instances, the PORT environment variable is always set to 8080 (but for portability reasons, your code should not hardcode this value). This means, you don’t strictly need the fix I submitted to play with Cloud Run and NiFi, but it’s safer if you do.

The container, when started, must be able to serve the request within 4 minutes which means that we want a container able to start as quick as possible and with a low footprint. To do that, I chose to work with the MiNiFi Docker image instead of the NiFi one.

Converting the workflow for MiNiFi

Once the workflow is developed, you can save it as a template (XML file) and use the MiNiFi toolkit to convert the XML file into a YAML file that will be useable by MiNiFi.

# command to convert the template into yml file for MiNiFi
$MINIFI_HOME/bin/config.sh transform templateConvertRecord.xml config.yml

You can have a look at the YAML file on the Github repository.

Building the container

By default, the MiNiFi Docker image comes with a limited set of features and it’s up to you to add the elements you need to run your workflow if required. In my case, I added:

  • a custom build of the standard processors NAR to include my fix,
  • the HTTP context NAR that is required when using HandleHttpRequest/Response processors,
  • and the Record serialization NAR required for the ConvertRecord processor.

Here is my Dockerfile:

FROM apache/nifi-minifi:latest
USER root

ENV MINIFI_HOME /opt/minifi/minifi-0.5.0

ADD config.yml $MINIFI_HOME/conf/config.yml
ADD *.nar $MINIFI_HOME/lib/

RUN chown -R minifi:minifi $MINIFI_HOME

USER minifi

You then need to create your image and push it to the Container Registry service in the Google Cloud Platform. You can have a look at the documentation here.

git clone git@github.com:pvillard31/nifi-cloudrun-example.git
cd nifi-cloudrun-example
gcloud auth configure-docker
# docker build -t [HOSTNAME]/[PROJECT-ID]/[IMAGE]:[TAG] .
docker build -t eu.gcr.io/my-gcp-project/minifi-csvjson:0.0.1 .
# docker push [HOSTNAME]/[PROJECT-ID]/[IMAGE]:[TAG] .
docker push eu.gcr.io/my-gcp-project/minifi-csvjson:0.0.1

Deploy in Cloud Run

Once your Docker image is pushed in the Container Registry, you can go to the Cloud Run service and deploy your service!

Click on Create Service and fill the information:

Create a service in Cloud Run

Once your service is deployed, you will get a service HTTPS URL exposing your service:

I can now directly send requests to the exposed service:

$ time curl --data-binary "@test.csv" -H "Content-Type: text/csv" -X POST https://minifi-csvjson-t546z2l6aq-uc.a.run.app
[ {
  "name" : "Pierre Villard",
  "company" : "Google"
} ]
real	0m0.725s
user	0m0.036s
sys	0m0.026s

And I can also monitor my service in Stackdriver:

Conclusion

That’s it! You know how to deploy a new service in Cloud Run using the features of NiFi to develop your workflow and using the MiNiFi Docker image for running your containers.

Hopefully this demonstrates how easy it is to create and deploy a service and everything is managed for you in terms of scaling and lifecycle… and you’re paying only for what you really use!

In combination with the NiFi Registry and all the tools you have on the Google Cloud Platform you can have a very neat pipeline to automate all of it and deploy new services in seconds!

As always, feel free to ask questions and/or leave a comment.

NiFi 1.8+ – Revolutionizing the List/Fetch pattern and more…

If you read my post about List/Fetch pattern and if you’re using this approach for some of your workflows, this new feature coming with NiFi 1.8.0 is going to be a revolution.

A quick recap about the context: in NiFi, unless you specifically do something to make the nodes of a NiFi cluster exchange data, a flow file will remain on the same node from the beginning to the end of the workflow. For some use cases, it is necessary to load-balance the flow files among the nodes of the cluster to take advantage of all the nodes (like the List/Fetch pattern).

The only way to load balance data in a NiFi cluster before NiFi 1.8 is to use the Site-to-Site (S2S) protocol in NiFi with a Remote Process Group (RPG) connected to the cluster itself with an input/output port. In addition to that the S2S forces you to have the input/output port defined at the root level of the canvas.  In a multi-tenant environment this can be a bit annoying and can make the workflows a little bit more complex.

What’s the revolution with NiFi 1.8+? For intra-cluster load balancing of the flow files, you now can do it directly by configuring it on the relationship between two processors. It could sound like a minor thing, but that’s HUGE! In addition to that, you have few options to configure the load-balancing which opens up new possibilities for new use cases!

The List/Fetch pattern is described in my previous post: in short… it’s the action to use a first processor (ListX) only running on the primary node to list the available data on X and generate one flow file per object to retrieve on X (the flow file does not have any content but contains the metadata to be used to fetch the object), then flow files are distributed among the NiFi nodes and then the FetchX processor running on all nodes will take care of actually retrieving the data. This way you ensure there is no concurrent access to the same object and you distribute the work in your cluster.

List/Fetch pattern before NiFi 1.8.0

If we have a project A retrieving data from a FTP server using the List/Fetch pattern to push the data into HDFS, it’d look like this:

  • Root Process Group level

Screen Shot 2018-10-18 at 10.03.23 PM.png

  • Inside the Process Group dedicated to Project A

Screen Shot 2018-10-18 at 10.04.01 PM.png

The ListFTP is running on the primary node and sends the data to the RPG which load balances the flow files among the nodes. Flow files are pushed to the input port at the root level and the data can then be moved back down to the process group of the project. Then the FetchFTP actually retrieves the data and the data is sent to HDFS.

List/Fetch pattern with NiFi 1.8+

Now… it looks like this:

  • Root Process Group level

Screen Shot 2018-10-18 at 10.09.29 PM

  • Inside the Process Group dedicated to Project A

Screen Shot 2018-10-18 at 10.10.30 PM

It’s crazy, no? You don’t have anything outside of the process group anymore, the workflow is cleaner/simpler, and authorizations are much easier to manage.

Where is the trick? Did you notice the small icon on the relationship between the ListFTP and the FetchFTP? It looks small but it’s HUGE :).

Load balancing strategies in NiFi 1.8+

Let’s have a look at the configuration of a connection:

Screen Shot 2018-10-18 at 10.18.24 PM.png

There is a new parameter available: the load balance strategy. By default it defaults to “do not load balance” and, unless you need to, you won’t change that parameter (you don’t want to move data between your nodes at each step of the workflow if there is no reason to do so).

Here are the available strategies:

Screen Shot 2018-10-18 at 10.20.24 PM

The Round robin strategy is the one you would probably use in a List/Fetch use case. It will ensure your data is evenly balanced between your nodes.

The Single node strategy allows you to send the data back to one single node. You can see it a bit like a reducer in a MapReduce job: you process the data on all the nodes and then you want to perform a step on a single node. One example could be: I have a zip file containing hundreds of files, I unzip the file on one node, load balance all the flow files (using Round Robin strategy for example) among the nodes, process the files on all the nodes and then send back the flow files to a single node to compress back the data into a single file. It could look like this:

Screen Shot 2018-10-18 at 10.31.08 PM.png

Then you have the Partition by attribute strategy allowing you to have all the flow files sharing the same value for an attribute to be sent on the same node. For example, let’s say you receive data on NiFi behind a load balancer, you might want to have all the data coming from a given group of sources on the same node.

I won’t go into much more details, but feel free to have a look at the documentation. Besides, I’m sure other members of the Apache NiFi community will publish blog posts on this subject… such as this excellent one!

Let’s just give it a try with each strategy… I’m using a GenerateFlowFile (GFF) connected to an UpdateAttribute and we will list the flow files queuing in the relationship to check where the flow files are located. Besides, the GenerateFlowFile is configured to set a random integer between 2 and 4 for the attribute ‘filename’.

Let’s start with a GFF running on the primary node only with no load balancing strategy:

Screen Shot 2018-10-18 at 10.44.06 PM.png

My primary node being my node2 (I have node2, node3 and node4 in my cluster):

Screen Shot 2018-10-18 at 10.45.05 PM.png

I can confirm all the flow files are on the primary node:

Screen Shot 2018-10-18 at 10.46.37 PM.png

Let’s change the Load Balance strategy to Round Robin. We can confirm the data is evenly distributed:

Screen Shot 2018-10-18 at 10.50.02 PM

Let’s change the strategy to One single node. We can confirm the data is now back to a single node (not necessarily the primary node):

Screen Shot 2018-10-18 at 10.53.22 PM.png

And now let’s try the partitioning by attribute using the ‘filename’ attribute. We can confirm that all the flow files sharing the same value in the Filename column are on the same node:

Screen Shot 2018-10-18 at 10.55.21 PM.png

Screen Shot 2018-10-18 at 10.56.12 PM.png

Again, I expect to see additional blog posts on this subject with more technical insights on how it actually works and what you should consider in case your cluster is scaling up and down.

Also… this new feature comes with another one which is as much exciting: the offloading of nodes in a cluster (look at the Admin guide in the documentation). In case you want to decommission a node in a cluster, this will take care of getting back the data on the other nodes before you actually remove the node for good. This will be particularly useful when deploying NiFi on Kubernetes and scaling up and down your cluster!

As always, feel free to comment and/or ask questions! Thanks for reading!

Monitoring Driven Development with NiFi 1.7+

I already discussed the Monitoring Driven Development (MDD) approach in this post and already covered some monitoring topics around NiFi in few posts (reading the posts would probably help you for reading this one – or at least refresh your memory on some concepts). In this post I want to present once again the MDD approach with some new features of NiFi 1.7 making things a bit easier and cleaner.

What do I mean when talking about MDD in NiFi? When I say MDD, I refer to the fact that, when a team is developing a “core workflow” for a use case, the team should also develop a “monitoring workflow” that will be in charge of monitoring and reporting the health of the “core workflow” based on their business requirements.

How does this work in NiFi? This approach completely relies on two things:

  • A proper naming convention for the components used in the workflows
  • The use of the Site-to-Site Reporting Tasks (see here)

Basically, the idea is to setup the S2S reporting tasks to send data to an input port dedicated to monitoring, then basic routing and filtering is performed assuming projects are following the naming convention so that all the data related to a given “core workflow” is automatically routed to the associated “monitoring workflow”. This way, it’s up to each project to implement the monitoring they want for their use case based on the requirements and SLA they have.

What’s new in NiFi that’s going to help me with MDD?

There are quite few interesting JIRAs but the two most important ones are:

  • [NIFI-4814] – Add distinctive attribute to S2S reporting tasks (NiFi 1.6.0)
  • [NIFI-5122] – Add record writer to S2S Reporting Tasks (NiFi 1.7.0)

Example

For my example, the naming convention is the following: any component that should be monitored should be named with a prefix ABC_ where ABC is the three letters abbreviation representing the use case/project.

I have two projects, one named PJA and one named PJB.

Let’s say the project A is using HandleHttpRequest/Response to receive XML data over HTTP and has two monitoring requirements:

  • Send an email to the team in case a bulletin is generated
  • Monitor in Grafana the amount of valid/invalid data being processed

Let’s say the project B is using ListenTCPRecord for a use case around logs ingestion and has one monitoring requirement:

  • Send an email to the team in case back pressure is enabled

The development of the workflows is outside the scope of this post. I’ll keep things simple as the objective is to demonstrate the MDD approach, not how to implement the workflows of the specific use cases I chose.

A *very basic* implementation of the “core workflow” of PJA could be:

Screen Shot 2018-08-29 at 3.55.19 PM.png

A *very basic* implementation of the “core workflow” of PJB could be:

Screen Shot 2018-08-29 at 4.36.29 PM

Now I’m going to configure the S2S Reporting Tasks – one for Controller Status data and one for Bulletins data. Note that there is a new feature allowing users to select a Record Writer for the generated data. I’m going to use the Avro one for the reporting tasks so that I don’t have to care about the schemas at all. For that I need to create a Controller Service (in Controller Settings menu since this controller service is exclusively used by reporting tasks).

Screen Shot 2018-08-29 at 5.02.26 PM.png

After enabling the CS, I can configure my reporting tasks.

The one for bulletins:

Screen Shot 2018-08-29 at 5.03.59 PM

And the one for controller status:

Screen Shot 2018-08-29 at 5.05.21 PM

Note that you have additional S2S Reporting Tasks if needed. The one about provenance events can be particularly useful for some monitoring requirements such as latency.

At the root level of my canvas, I’ve something looking like:

Screen Shot 2018-08-29 at 5.45.38 PM.png

I’m receiving all the monitoring data sent by the reporting tasks in the ‘monitoring’ input port. Then in my ‘Monitoring’ process group, I have something looking like:

Screen Shot 2018-08-29 at 5.46.48 PM.png

You can notice I’m sending all the data to a ‘Technical Monitoring’. That’s because, the team in charge of administrating/monitoring the platform probably wants to also have a look at the monitoring data to ensure the full system is performing OK (common examples: send the bulletins to an external tool like ES/Kibana, send email alerts in case back pressure is enabled on a connection, etc).

On the upper part of the workflow, that’s where I’m processing the monitoring data to split it and route the monitoring data by project. First I’m using a RouteOnAttribute to route the data based on the type (bulletins, controller status, etc):

Screen Shot 2018-08-29 at 5.50.48 PM

Then I’m using an UpdateRecord processor to add a ‘project’ field that I’m computing based on the component name.

For bulletins:

Screen Shot 2018-08-29 at 5.55.34 PM

For controller status:

Screen Shot 2018-08-29 at 5.52.43 PM.png

Since the data has been sent in Avro format by the reporting task, I’m using an Avro reader Controller Service with default settings. And, from now on, I want the data to be in JSON (as it’ll be easier in case I want to send the data via an email or via HTTP). I’m configuring few controller services: one Avro Schema Registry containing the schemas (that you can retrieve in the additional details of the reporting task documentation, see below), one JSON Reader and one JSON Writer.

For instance, to get the schema of the S2S Bulletin Reporting Task, go on the reporting task list:

Screen Shot 2018-08-29 at 5.59.40 PM.png

Click on the documentation icon on the left:

Screen Shot 2018-08-29 at 6.00.21 PM.png

And then click Additional Details:

Screen Shot 2018-08-29 at 6.00.58 PM.png

When adding the schemas in the Avro Schema Registry, use the type of the reporting task as the name of the schema and don’t forget to add a ‘project’ field in the schemas:

Screen Shot 2018-08-29 at 6.02.28 PM.png

Now you can easily reference the schemas to use in the configuration of the JSON Reader/Writer using the expression language because the flow files generated by the reporting tasks have an attribute with the type of the source reporting task.

Example of attributes on a flow file generated by a RT:

Screen Shot 2018-08-29 at 6.28.56 PM.png

JSON Reader:

Screen Shot 2018-08-29 at 6.03.44 PM.png

JSON Writer:

Screen Shot 2018-08-29 at 6.04.13 PM.png

Now I’ve a ‘project’ field in my data with the project name (if the naming convention has been correctly used), then you can use a QueryRecord processor to route the data for each project:

Screen Shot 2018-08-29 at 6.07.17 PM.png

And we can confirm that only the data for project A is routed to the ‘Monitoring Workflow’ for project A. If looking at the data in the relationship routing data for PJA:

Screen Shot 2018-08-29 at 6.08.53 PM.png

Now, when deploying a new project for the first time (ie importing the process group of the “core workflow”), you just need to update the QueryRecord processor to add a dynamic property for the new project as well as deploying the process group for the “monitoring workflow” of the project. In combination with the NiFi Registry it makes things really smooth when you need to update the workflows with a new version.

I’m not going to develop the monitoring workflows for PJA and PJB but it’d only be a matter of few record processors to filter the data (bulletin? back pressure enabled?), and then use a processor to send the data (ElasticSearch, PutEmail, InvokeHttp, etc):

  • “send an email when a bulletin is generated” – RouteOnAttribute to get the bulletins data and PutEmail to send an email
  • “monitor the number of valid/invalid XML files being processed” – filter the controller status statistics on the ‘Connection’ elements and keep only the statistics of the connections after the ValidateRecord processor, this can be easily done with a QueryRecord processor. The numbers can then be sent to an external system such as ElasticSearch for a dashboard in Kibana/Grafana
  • “send an email when back pressure is enabled” – use a QueryRecord processor on the controller status data by querying the field ‘isBackPressureEnabled’ and then use PutEmail processor.

The full workflow of this example is available as a gist here. Note that, since it’s a template, it does not contain the Reporting Tasks configuration. But you should be up and running quite quickly with the above screenshots.

As always, thanks for reading this post, feel free to comment and/or ask questions. In case you have suggestions to make things even better, just let me know and, maybe, I’ll have to publish a new version of this post to present the new features of a next release 😉

NiFi 1.7+ – Terminate threads

One of the new features coming with NiFi 1.7.0 is the possibility to terminate stuck threads directly from the UI. Before this release, when you had a processor getting stuck (like a custom processor with a deadlock) you had no option but to restart NiFi… and that’s not really great in a multi-tenant setup.

Let’s see this new feature with an example: I’m using a GenerateFlowFile and, using the debug mode of my IDE, I’m going to simulate an issue with the thread by adding a break point in the onTrigger() method of the processor.

Screen Shot 2018-07-02 at 2.56.57 PM.png

When the processor is running, we can see the number of threads currently used by the processor (top right). If the number of tasks is not increasing (and equal to 0 after 5 minutes) and the number of threads is constant, it probably means you have a stuck thread. In this case, it is always recommended to do a thread dump of NiFi to see what is going on. To do that:

./bin/nifi.sh dump /tmp/thread-dump.txt

If we look at the content of the generated file and looking for my GenerateFlowFile processor, we can see something like:

"Timer-Driven Process Thread-5" Id=57 RUNNABLE (suspended)
 at org.apache.nifi.processors.standard.GenerateFlowFile.onTrigger(GenerateFlowFile.java:210)
 at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
 at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1165)
 at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:203)
 at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
 at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
 at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
 at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745)
 Number of Locked Synchronizers: 1
 - java.util.concurrent.ThreadPoolExecutor$Worker@2388307

In this case, the thread is shown as “suspended” at the line where I put my break point in my IDE.

Note that having a stuck thread is usually a symptom of a badly designed processor or an underlying issue with the code dependencies of the processor. Having the thread dump can help locating and fixing the issue.

If trying to stop the processor:

Screen Shot 2018-07-02 at 3.02.23 PM.png

I’ll now see two threads being used by the processor:

Screen Shot 2018-07-02 at 3.03.09 PM

The processor is now in the process of being stopped but you won’t be able to update or restart the processor until it is actually stopped. However, if the initial thread is stuck, you’ll remain in this state forever (and, eventually, you’ll have to restart NiFi if you’re running a version below 1.7.0).

With NiFi 1.7.0, you now have the possibility to terminate the thread (meaning you don’t have to wait for the thread to be completely stopped):

Screen Shot 2018-07-02 at 3.06.58 PM

If I terminate the thread, here is what I’ll see:

Screen Shot 2018-07-02 at 3.08.07 PM

Meaning there is no more active thread and there is one thread being terminated.

Under the hood, the framework is issuing an “interrupt” for the thread and will perform a “reload” of the processor meaning there is a new instance of the processor class being created. This allows the old class to eventually shut down gracefully, close connections, etc. However this needs to be used with care: if the class is maintaining some values in internal variables, this information will be lost in the process (I’m not talking about state information that can be saved at framework level by the processors).

An interrupt is an indication to a thread that it should stop what it is doing and do something else. It’s up to the programmer to decide exactly how a thread responds to an interrupt, but it is very common for the thread to terminate. However if, for some reasons, the thread does not respond to interrupt, then you will keep the thread as being terminated forever (in my above example, I can see the thread staying in the state of being terminated because of my break point).

You can now update the configuration and start again the processor (even if a thread is being terminated). Once the thread is terminated, you’ll get back to a nominal situation:

Screen Shot 2018-07-02 at 4.54.51 PM.png

If you don’t want to setup a debug environment, you can simulate a stuck thread with the following ExecuteScript processor:

Screen Shot 2018-07-02 at 4.56.49 PM.png

If starting/stopping the processor:

Screen Shot 2018-07-02 at 6.02.37 PM.png

And I’ll get the following message when terminating the thread:

Screen Shot 2018-07-02 at 6.02.58 PM.png

You also can use the DebugFlow processor with “@OnStopped Pause Time” set to something like “5 min” and “Ignore Interrupts When Paused” set to “true”:

Screen Shot 2018-07-02 at 6.49.29 PM.png

That’s it for this post! This new feature is really nice in a multi-tenant environment where you can’t afford a restart of the service. If you’re in a situation where you need to use this feature, remember to take a thread dump before actually terminating the thread. This will be really useful to investigate the issue. Also, remember that terminating a thread is not a “normal” operation, it means there is something wrong somewhere and it could very likely happen again.

Thanks for reading this post and, as always, feel free to comment / ask questions!

NiFi 1.7+ – XML Reader/Writer and ForkRecord processor

Starting with NiFi 1.7.0 and thanks to the work done by Johannes Peter on NIFI-4185 and NIFI-5113, it’s now possible to use an XML reader and writer in the Record processors to help you processing XML data. Before that, you had few options requiring a bit of additional work to get things working (see here).

I won’t go into the details because the reader/writer are really well documented (have a look at the additional details for examples):

Also, with NIFI-4227, a ForkRecord processor is now available to “explode” your data when you have one or multiple embedded arrays in your data set and you need to normalize your data. Have a look at the ForkRecord documentation (additional details) for examples.

Example – Introduction

Let’s go through an example using the XML Reader and the ForkRecord processor. I assume I’m receiving XML data with the following schema:

Screen Shot 2018-06-27 at 10.39.20 AM.png

And here is a dummy file I’m receiving that I’ll use for this example:

Screen Shot 2018-06-27 at 2.15.28 PM.png

The corresponding Avro schema can be found here. Defining the Avro schema corresponding to your data is the most “difficult” part but once it’s done, everything else is really easy with the Record processors. You can get some help using the InferAvroSchema processor but this should only be used to get an initial version of your Avro schema when you start developing your workflows (it should not be used as a processor used in your production workflows).

I’m going to quickly explain the following workflow (template available here):

Screen Shot 2018-06-27 at 2.24.43 PM.png

Example – XML to JSON conversion

The GenerateFlowFile is used to generate my XML data and to send the content to the Record processors I’m using. The upper part of the workflow is just a ConvertRecord processor to perform the XML to JSON conversion thanks to the schema. The other parts of the workflows are used to extract each “object” of my schema (customer, address, account and transaction).

Here are the configuration details for the XML to JSON conversion. The GenerateFlowFile is very simple:

Screen Shot 2018-06-28 at 2.25.23 PM

Note the attribute ‘schema.name’ set with the value of the schema I added in my Avro schema registry controller service:

Screen Shot 2018-06-28 at 2.26.24 PM.png

The XML Reader is configured to get the schema name from the ‘schema.name’ attribute of the processed flow files:

Screen Shot 2018-06-28 at 2.27.21 PM.png

The JSON writer does not contain any specific configuration, it’ll write the data using the schema inherited at the reader level:

Screen Shot 2018-06-28 at 2.28.22 PM

The ConvertRecord processor can now be configured for the XML to JSON conversion:

Screen Shot 2018-06-28 at 2.29.20 PM.png

That’s it! You now have a very easy and powerful way to perform your XML to JSON conversion. Here the flow file content before the processor:

Screen Shot 2018-06-27 at 2.15.28 PM

Here is the JSON generated by the processor:

Screen Shot 2018-06-28 at 2.31.23 PM

If you need to manipulate your records to go from one schema to another, you can use the UpdateRecord processor.

Note – using the XML Reader/Writer and Record processors, you can expect performances as good as the best options presented in my previous post about XML data processing.

Example – ForkRecord processor

The goal here is to extract the data contained in the arrays into separated CSV files. From one XML file, I want to generate 4 CSV files that I could use, for instance, to send data into database tables.

Let’s start with the first one regarding customer data. I’m adding a ‘customer’ Avro schema in the Avro schema registry:

Screen Shot 2018-06-28 at 2.42.58 PM.png

I’m using an UpdateAttribute, to set the name of the ‘output.schema’ that I want to be used by the CSV Record Writer. Then, I’m just using a ConvertRecord processor to process from XML to JSON and I only keep the root level fields (customer data). Here is the configuration of my CSV writer:

Screen Shot 2018-06-28 at 2.45.23 PM.png

Here is the output when processing my XML data:

Screen Shot 2018-06-28 at 2.46.32 PM.png

Now, I want to extract data contained in arrays and that’s where the ForkRecord is useful. To use it, I need to specify a custom property containing the Record path to the array that the processor should process, then you have two modes for the processor:

  • Split – that will preserve the input schema but will create one flow file per element contained in the array.
  • Extract – that will generate flow files using the element of the arrays with the possibility to include the parent fields up to the root level in case you want to keep some fields as keys/identifiers of your arrays elements. Note that you can define which parent fields you want to preserve by setting the appropriate schema with only the fields you want.

In our case we’re using the Extract mode and we are including the parent fields: for the ‘address’ data for instance, we want to keep the customer_id field in the output (that field would be used as a key between the customer data and the address data in the database world). Here is the schema I define in my Avro schema registry for the ‘address’ object:

Screen Shot 2018-06-28 at 2.52.03 PM

I’m using an UpdateAttribute to set the attributes I’m using in my reader and the ForkRecord processor:

Screen Shot 2018-06-28 at 2.55.04 PM

And I’m configuring the ForkRecord processor to use the ‘fork.path’ attribute defining the array of elements to be processed:

Screen Shot 2018-06-28 at 2.55.59 PM.png

Note – if you have multiple Record paths pointing to multiple arrays with elements using the same output schema, you can define as many custom properties as you want.

And here is the output after the ForkRecord processor:

Screen Shot 2018-06-28 at 2.56.40 PM

Here are the Record paths I’m setting for the other type of objects I want to extract:

  • Account – fork.path = /accounts/account
  • Transaction – fork.path = /accounts/account[*]/transactions/transaction
    In this case, I need to specify account[*] because I want to access all the transactions of all the elements in the account array. But I could use the Record path features to only access some specific accounts based on some predicates.

And here the output I get (after defining the appropriate schema in my schema registry):

  • Account data (I only keep the customer_id from the parent fields):

Screen Shot 2018-06-28 at 3.02.14 PM.png

  • Transaction data (I only keep the customer_id and account_id from the parent fields):

Screen Shot 2018-06-28 at 3.03.55 PM

Conclusion

The XML Reader & Writer are a really nice addition to NiFi if you need to process XML data and it will make your existing workflows much simpler! You really have to look at the Record processors and concepts as soon as you’ve a use case manipulating data complying to a schema.

The ForkRecord processor is a new processor that you can use if you need to manipulate schema oriented data containing a lot of arrays. That can be useful in case you need to normalize data before ingestion into databases.

Thanks for reading this post and, as always, feel free to comment and/or ask questions!