Running visual quality inspection at the edge with Google Cloud and Apache NiFi & MiNiFi

All of my NiFi related content in one place

On the 23rd of October 2019, I gave a talk at the Apache Con in Berlin about running visual quality inspection at the edge. This is the story describing the talk and helping you use both Google Cloud and Apache NiFi & MiNiFi to continuously run updated TensorFlow models at the edge.


Picture this: you are in a factory making cookies… you have thousands of cookies going through your production lines every day and you want to make your customers happy. Problem is… a broken cookie makes an unhappy customer! So you want a way to detect the broken cookies before they get into the final packaging.

This is the example I am choosing to support this story but it could really be anything related to visual quality inspection which spans across multiple industries such as retail, manufacturing, energy, etc. Anything that you can think of that would need a ML model trained over a dataset of images.

What is Apache NiFi & MiNiFi?

Apache NiFi is an easy to use, powerful, and reliable system to process and distribute data. Apache NiFi supports powerful and scalable directed graphs of data routing, transformation, and system mediation logic. MiNiFi — a subproject of Apache NiFi — is a complementary data collection approach that supplements the core tenets of NiFi in data flow management, focusing on the collection of data at the source of its creation.

In simpler words, Apache NiFi and MiNiFi provide a great combination of tools and features to collect and move data around, process it, clean it and integrate it with other systems. As soon as you need to bring data in, you want to use Apache NiFi.

What is Google Cloud Vision?

Google Cloud Vision allows you to derive insights from your images in the cloud or at the edge with AutoML Vision or use pre-trained Vision API models to detect emotion, understand text, and more.

With Google Cloud Vision, you have two options:

  • AutoML Vision: automate the training of your own custom machine learning models. Simply upload images and train custom image models with AutoML Vision’s easy-to-use graphical interface; optimize your models for accuracy, latency, and size; and export them to your application in the cloud, or to an array of devices at the edge.
  • Vision API: Google Cloud’s Vision API offers powerful pre-trained machine learning models through REST and RPC APIs. The APIs cover: image classification and labelling into millions of predefined categories, detect objects and faces, read printed and handwritten text, build valuable metadata into your image catalog, moderate content, etc.

In this story we are going to build our own custom machine learning models with our own labels and images using AutoML Vision.

Objective: continuous model retrain

![Automatic retrain of the custom model](/2019/2019-10-29-running-visual-quality-inspection-at-the-edge-with-google-cloud-and-apache-nifi-minifi/images/cb517-1hz5tqhzqjtt-dg6rj1ywua.png)
**_Automatic retrain of the custom model_**

The objective is to have an end-to-end system to detect the defects while having a continuous model retrain with the newly collected images. Key benefits are:

  • Automatically train customized ML models in the cloud
  • Efficiently acquire images, label images, deploy model and run inference
  • Continuously refresh models using fresh data from the production lines
  • Minimal human interaction
  • ML model accuracy improving over time

Using a Raspberry Pi at the edge

To collect data from the production lines, I’m going to use a Raspberry Pi on which I’ll be running Apache MiNiFi. I added a camera to take pictures as well as LEDs to give visual indications of what is currently being processed.

![Raspberry Pi 3 used to collect data](/2019/2019-10-29-running-visual-quality-inspection-at-the-edge-with-google-cloud-and-apache-nifi-minifi/images/3c230-1ckf6mepetymmyas60zz4va.png)
**_Raspberry Pi 3 used to collect data_**

In order to collect data from the edge with the Raspberry Pi, I’m going to use Google IoT Core which is a fully managed service to easily and securely connect, manage, and ingest data from globally dispersed devices.

First step is to create a Device Registry which is a container of devices with shared properties. Once the Registry is created, you can register your device with its device identity. To do that you need to generate a public/private key pair that will be used by the device to authenticate against Google IoT Core (you can also use Certificate Authorities). Here are the commands I used to generate my key pair for the Raspberry Pi.

![My device is registered in my device registry](*ue5wsQYmVib5NKCa)
**_My device is registered in my device registry_**

Then I’ll be using the MQTT protocol to send data between my device and Google Cloud, it will automatically make the data available into a Pub/Sub topic:

![Interface between the device and Google Cloud IoT Core](/2019/2019-10-29-running-visual-quality-inspection-at-the-edge-with-google-cloud-and-apache-nifi-minifi/images/dad11-1e-sdkiah-gddmjfwunvabq.png)
**_Interface between the device and Google Cloud IoT Core_**

To communicate with Google Cloud IoT Core through MQTT, I’ll be using a processor that is currently under code review in a Pull Request on Github. I’ll also make this processor available in a repo I created to share the code snippets of my talk.

One problem, two architectures

When we’ll have collected pictures from our devices, we’ll be able to train our custom model. Then we’ll have to choose between two options:

  • Deploy the model in Google Cloud and do the inference in the cloud. It means that for each picture, the Raspberry Pi with Apache MiNiFi will make an HTTPS call to the served model to get the label of the picture. While this is efficient and is easy to implement and scale, this means making a call over the Internet and a longer inference time. In this story that what I’ll call the mode “cloud” when configuring the device. In the cloud mode, here is what the architecture looks like:
![Architecture in the “cloud” mode](/2019/2019-10-29-running-visual-quality-inspection-at-the-edge-with-google-cloud-and-apache-nifi-minifi/images/42fb2-1ww5lopg8fxhzicn0chdl9a.png)
**_Architecture in the “cloud” mode_**
  • Download the model from Google Cloud on the Raspberry Pi and do the inference at the edge using Apache MiNiFi. Google Cloud provides multiple options to download your custom models with various accuracy versus latency parameters. Running the model on the edge will lead to much faster inference time. In this story that’s what I’ll call the mode “edge” when configuring the device. In the edge mode, here is what the architecture looks like:
![Architecture in the “edge” mode](/2019/2019-10-29-running-visual-quality-inspection-at-the-edge-with-google-cloud-and-apache-nifi-minifi/images/5a5f0-1yzcfapavemndd4sv9a9tta.png)
**_Architecture in the “edge” mode_**

We’ll also see that, in the edge mode, it’s possible to get an even better inference time using Google’s hardware with the Coral Edge TPU.

Collecting pictures

Here is what the workflow running in MiNiFi (on the Raspberry Pi) looks like:

![Workflow running in MiNiFi on the Raspberry Pi to collect images](/2019/2019-10-29-running-visual-quality-inspection-at-the-edge-with-google-cloud-and-apache-nifi-minifi/images/e9b52-1fuovdaysyy0kgyjwziujya.png)
**_Workflow running in MiNiFi on the Raspberry Pi to collect images_**

The workflow is really simple: a “standalone” processor executes a Python script to take pictures at a given frequency using the Pi camera. Then the pictures are fetched into MiNiFi to be sent to Google Cloud IoT Core through the MQTT processor. I’m also switching on/off the amber LED when this process occurs to give a visual indication on the Pi about what’s going on.

As we said, the data will be automatically sent to a Pub/Sub topic and made available for consumption. In the Google Cloud Platform, I’m running a standalone secured Apache NiFi instance (see my previous post for a very quick deployment using Terraform) on which the below workflow is running:

![Workflow running in NiFi in GCP to ingest images](/2019/2019-10-29-running-visual-quality-inspection-at-the-edge-with-google-cloud-and-apache-nifi-minifi/images/775c0-1rhvxdhbjiberznmpadaroa.png)
**_Workflow running in NiFi in GCP to ingest images_**

The pictures are pulled from my topic’s subscription, then the images are stored in Google Cloud Storage and I’m using the Google Cloud Vision API to add the pictures into my dataset (which is the collection of images I’m using to train my custom models).

I won’t go too much into the details as the documentation is self-explanatory but here is the call I’m making — it gives the GCS path of the CSV file I created listing the GCS paths of all the images I ingested during the last X minutes:

curl \
  -X POST \
  -H "Authorization: Bearer $(gcloud auth application-default print-access-token)" \
  -H "Content-Type: application/json" \${projectID}/locations/us-central1/datasets/${datasetID}:importData \
  -d '{
        "inputConfig": {
          "gcsSource": {
            "inputUris": "gs://${gcs.bucket}/dataset.csv"

Note 1: the dataset ID is something looking like: ICN4695798657952251904. Note 2: in NiFi, I externalized a lot of variables (project ID, GCS bucket, dataset ID, etc) that I reuse across the workflow to make it much more easier to use and configure.

Managing your device configuration

Once your device is registered in the Device Registry in Google Cloud IoT Core, you can use the UI to send configuration updates or commands to your devices. Here is what it looks like:

![Updating the configuration of my device through MQTT](*w2c98GNTjB9soGaS)
**_Updating the configuration of my device through MQTT_**

On the Raspberry Pi, in MiNiFi, the configuration updates and commands are received by the MQTT processor and I process the received data to update the configuration of my device:

![Reception of configuration updates and commands](*Qa5wvn1bZZ45YAAx)
**_Reception of configuration updates and commands_**
![Processing configuration updates](/2019/2019-10-29-running-visual-quality-inspection-at-the-edge-with-google-cloud-and-apache-nifi-minifi/images/84901-1rhqnxjl_l2v02phrfevmra.png)
**_Processing configuration updates_**

When I receive a configuration update, I process the JSON payload to extract the mode and if I should use the Edge TPU or not, I then store this information into a local cache that is also persisted on disk in case of device restart.

Training and serving your model

We now have a device that we are able to configure from anywhere, that is automatically sending pictures from our production lines, and the pictures are automatically stored and added to our Google Cloud Vision dataset to train our custom models.

The next step, in Google Cloud Vision, is to create labels that will be used to classify our images. In this case, I only create two labels:

  • OK: the cookie looks good and can go into the packaging
  • NOK: the cookie is broken and should be removed

In the UI, I’m able to manually label the images and I need, at least, 10 images per label to start training my custom model (the more you have, the better the results are, and that’s why we want to continually ingest new images and retrain our model over time):

![My dataset with the pictures of my cookies and my labels](*IzPnIwbrbjGT_NoA)
**_My dataset with the pictures of my cookies and my labels_**

In my NiFi instance running in GCP, I created a workflow which is used to trigger a model training every day in order to take into account the newly captured images and improve the model’s accuracy over time:

![Model training is triggered daily in NiFi](/2019/2019-10-29-running-visual-quality-inspection-at-the-edge-with-google-cloud-and-apache-nifi-minifi/images/cf796-1hgn9hgjc4dwlegjyxssymw.png)
**_Model training is triggered daily in NiFi_**

To start the custom model training and wait until the training is completed I’m using the REST APIs described in the documentation. However, at this stage, you have to specify if you want to train a model that will run in the cloud (cloud mode) or at the edge (edge mode):

When training a model to be exported at the edge, you can specify a parameter allowing you to choose if you prefer latency over accuracy:

  • mobile-low-latency-1 for low latency,
  • mobile-versatile-1 for general purpose usage, or
  • mobile-high-accuracy-1 for higher prediction quality.

When a cloud-hosted model is being trained, the Google Cloud Vision UI would look like this:

![Cloud-hosted model currently being trained](*tZzpnmyAGrB-UxJQ)
**_Cloud-hosted model currently being trained_**

Once the model training is completed, NiFi will receive a JSON payload such as:

![JSON payload when model training is completed](*jgBpjdmBPi17I_t0)
**_JSON payload when model training is completed_**

Once the model is trained, we can access a lot of information regarding the model training and its accuracy compared to the provided dataset:

![Custom model information about precision, recall, etc](*Xg6-TkAvG4Bvbnn4)
**_Custom model information about precision, recall, etc_**

Cloud-hosted model

Once the model is trained, we can call the REST APIs to deploy the model. Here is the part of the NiFi workflow in charge of this:

![NiFi deploying the cloud-hostel custom model](/2019/2019-10-29-running-visual-quality-inspection-at-the-edge-with-google-cloud-and-apache-nifi-minifi/images/825ff-16hogaxr4mi0vuwqhged7pw.png)
**_NiFi deploying the cloud-hostel custom model_**

Once the deployment is completed, the model will be automatically exposed through a REST API in Google Cloud (you don’t have to worry about the how and the where, Google takes care of it). The only information you need is the model ID and this is the information we are sending to the Raspberry Pi using the Google IoT Core commands with the ExecuteStreamCommand processor:

gcloud iot devices commands send \
    --region=REGION  \
    --registry=REGISTRY_ID \

In MiNiFi, we are receiving the command and storing the model ID in the cache:

![Receiving the model ID through the Google IoT Core MQTT command](/2019/2019-10-29-running-visual-quality-inspection-at-the-edge-with-google-cloud-and-apache-nifi-minifi/images/1bae8-1d6ricpiu-8-v3jg1pnockg.png)
**_Receiving the model ID through the Google IoT Core MQTT command_**

We can now perform the inference for each picture we capture by making an HTTPS call against the exposed API:

![Inference in the cloud using the exposed API](/2019/2019-10-29-running-visual-quality-inspection-at-the-edge-with-google-cloud-and-apache-nifi-minifi/images/11f1d-10y0udqbahwsw4wmxaevoyg.png)
**_Inference in the cloud using the exposed API_**

Note the RouteOnAttribute processor that will check in which mode the device is configured. In this case, the device is configured in cloud mode.

The picture payload needs to be base64 encoded and the API to use is:

curl -X POST \
  -H "Authorization: Bearer $(gcloud auth application-default print-access-token)" \
  -H "Content-Type: application/json" \${PROJECT_ID}/locations/us-central1/models/${MODEL_ID}:predict \
  -d '{
        "payload" : {
          "image": {
            "imageBytes" : "/9j/4AAQSkZJRgABAQAAAQ … "

Edge exportable model

In this mode, once the TensorFlow model is trained, we just need to make an API call to export the trained model into Google Cloud Storage:

![API calls to export the custom edge model into Google Cloud Storage](/2019/2019-10-29-running-visual-quality-inspection-at-the-edge-with-google-cloud-and-apache-nifi-minifi/images/26cbf-10hy2axig5bi9rj0twdpeww.png)
**_API calls to export the custom edge model into Google Cloud Storage_**

The API to use is:

curl \
  -H "Authorization: Bearer $(gcloud auth application-default print-access-token)" \
  -H "Content-Type: application/json" \
  https://${ENDPOINT}/v1beta1/projects/${PROJECT_ID}/locations/us-central1/models/${MODEL_ID}:export \
  -d '{
        "output_config": {
          "model_format": "tflite",
          "gcs_destination": {
              "output_uri_prefix": "${USER_GCS_PATH}"

In case you want to export a model that is optimised for the Edge TPU (we’ll talk about this in a bit), then you have to use:

curl \
  -H "Authorization: Bearer $(gcloud auth application-default print-access-token)" \
  -H "Content-Type: application/json" \
  https://${ENDPOINT}/v1beta1/projects/${PROJECT_ID}/locations/us-central1/models/${MODEL_ID}:export \
  -d '{
        "output_config": {
          "model_format": "edgetpu_tflite",
          "gcs_destination": {
              "output_uri_prefix": "${USER_GCS_PATH}"

With this API call, NiFi will receive a JSON payload with the GCS path to the tflite file representing your custom model. Here again we are using a Google Cloud IoT Core command to send this information to our device through MQTT:

gcloud iot devices commands send \
    --region=REGION  \
    --registry=REGISTRY_ID \

On MiNiFi, we are receiving this information and downloading the model directly on the device so we can do local inference:

![Receiving the GCS path of the model through MQTT and downloading the model locally](/2019/2019-10-29-running-visual-quality-inspection-at-the-edge-with-google-cloud-and-apache-nifi-minifi/images/65c50-1g7waudrpbvoorqllznic-q.png)
**_Receiving the GCS path of the model through MQTT and downloading the model locally_**

Now we have the TensorFlow lite model downloaded on the Raspberry Pi, we can perform the inference on every captured image:

![Inference on the edge using the downloaded TensorFlow lite model](/2019/2019-10-29-running-visual-quality-inspection-at-the-edge-with-google-cloud-and-apache-nifi-minifi/images/8241c-1rmfcjjdlwn-uubdz-xcdfq.png)
**_Inference on the edge using the downloaded TensorFlow lite model_**

To execute the model locally, I’m executing a Python script that you can find here.

Boosting your device with the Coral Edge TPU

Since the 22nd of October 2019, Coral is now GA! Coral is a hardware and software platform for building intelligent devices with fast neural network inferencing.

At the heart of the devices is the Edge TPU coprocessor. This is a small ASIC built by Google that’s specially-designed to execute state-of-the-art neural networks at high speed, with a low power cost. The Edge TPU is capable of performing 4 trillion operations (tera-operations) per second (TOPS), using 0.5 watts for each TOPS (2 TOPS per watt).

![Edge TPU benchmark by Google](/2019/2019-10-29-running-visual-quality-inspection-at-the-edge-with-google-cloud-and-apache-nifi-minifi/images/9f32b-1dt_4gqonsrieldqdkqgbdq.png)
**_Edge TPU benchmark by Google_**

In my use case, I’m using the USB accelerator that I’m directly plugging to the Raspberry Pi.

![Coral USB accelerator](/2019/2019-10-29-running-visual-quality-inspection-at-the-edge-with-google-cloud-and-apache-nifi-minifi/images/00f9d-1kodawepknrnqrbkyzdn6hg.png)
**_Coral USB accelerator_**

Using the custom ML models I trained on Google Cloud Vision, I can easily compare the TensorFlow “invoke” time when using or not the Edge TPU:

  • When not using the Edge TPU optimised model:
![TF Lite invoke time when not using the Edge TPU — about 127ms/image](*-AoOL-oyScgfyQhZ)
**_TF Lite invoke time when not using the Edge TPU — about 127ms/image_**
  • When using the Edge TPU and the optimised model:
![TF Lite invoke time when using the Edge TPU — about 9ms/image](*ABOTnT74Mwj-aoG-)
**_TF Lite invoke time when using the Edge TPU — about 9ms/image_**

We get about 127ms without using the Edge TPU while we only need 9ms when using the Edge TPU on a low latency optimised model generated for my cookies use case with Google Cloud Vision AutoML.

Preliminary results

So far we have sent the pictures through MQTT and performed the classification inference into two distinct branches of the MiNiFi workflow (in other words: pictures ingestion and pictures inference are done in parallel). Here are the results we get (processing time is the duration between the moment we take the picture and the moment we get the label and the confidence score of the classification ; inference time is the duration of the “invoke” call to get the classification using the TensorFlow Lite model):

  • cloud mode (inference in the cloud through HTTPS):
    - Processing time: about 6 seconds per image
    - Inference time: about 2.5 seconds per image
  • edge mode (without the Edge TPU):
    - Processing time: about 750 milliseconds per image
    - Inference time: about 127 milliseconds per image
  • edge mode + TPU(with the Edge TPU):
    - Processing time: about 500 milliseconds per image
    - Inference time: about 9 milliseconds per image

Monitoring and auto-labelling

What we have done so far is great but, once we made the inference on the Raspberry Pi, we take the action to keep or not the cookie on the production line (in my case, I switch on and off the red or green lights) and we forget about the inference results. The next step is to perform the inference before sending the pictures through MQTT so that we can send the inference results along with the pictures. Pictures inference and pictures ingestion are done sequentially. This has two main key benefits:

  • We can collect additional data to deploy monitoring dashboards in order to have accurate information about how our custom models are performing on our devices
  • We can introduce auto-labelling of the images: if the confidence score of the inferred label is over, for example, 90%, we could auto-label the image when ingesting it into the Google Cloud Vision dataset so that minimal human interaction is required when it comes to labelling: only the images representing new types of defects might need human labelling.

I won’t go into the details of the required changes in the workflows but on MiNiFi it looks like this:

![MiNiFi workflow when sending inference results along with pictures through MQTT](*zopgZx_ie0YhdJi0)
**_MiNiFi workflow when sending inference results along with pictures through MQTT_**

Once we receive all the information in NiFi, we can add this information into Stackdriver monitoring dashboards and see in real-time how our custom models are performing:

![Processing time and score for cloud mode versus edge mode](/2019/2019-10-29-running-visual-quality-inspection-at-the-edge-with-google-cloud-and-apache-nifi-minifi/images/a5111-1rrnoevwi9sonrc7k0stbta.png)
**_Processing time and score for cloud mode versus edge mode_**

In the above picture we can see how the processing time and the confidence score is changing while going from the cloud mode to the edge mode after updating the device configuration. We clearly see that the processing is much faster but we are losing a little bit in accuracy (still above 90% though!).

We can also compare the results when using the Edge TPU or not while the device is configured in edge mode:

![Processing time and score for edge mode with and without Edge TPU](/2019/2019-10-29-running-visual-quality-inspection-at-the-edge-with-google-cloud-and-apache-nifi-minifi/images/a421c-1cboarhgpmfuwwe5qf_iowa.png)
**_Processing time and score for edge mode with and without Edge TPU_**

In the above picture,we can see an interesting result: when using the Edge TPU with the Edge TPU optimised model, we get a lower processing time while also getting a higher confidence score for the inference. No reason not to use it!


First, thank you for reading this very long story… Then, here is the conclusion of it: thanks to Apache NiFi, MiNiFi, the products of the Google Cloud Platform and the Coral products, we implemented an efficient end-to-end solution to run custom ML models on the edge while continuously refresh our models with new data to improve the models accuracy over time with no code and minimal human interaction. I’ll leave it to your imagination to transpose all of this to your very own use case!

The slides of the talk, the recording as well as code snippets, custom processors, workflows, scripts, etc, will be added in the coming days on this Github repository. As usual, feel free to comment/ask questions.