Predicting flood risk using satellite data and machine learning

One of the last project on which I was involved at Capgemini was related to the study of the Data Cube technology (developed by Geoscience Australia) on behalf of CNES (French spatial agency). Part of this analysis was to develop a demonstrator  working on heterogeneous satellite data using this framework.

Using Sentinel-1 data, and based on work done by the scientific office at Capgemini Toulouse, we leveraged the Data Cube to manipulate Sentinel-1 images (radar data) to detect flooded areas.

Here is an example where flooded areas are detected after monsoon in India few months ago (purple are permanent water areas, yellow are flooded areas):

flood_india

In addition to this detection algorithm, Capgemini has been working with the CESBIO (biosphere study laboratory) to implement a system producing a global flood risk map. This map is mainly based on data provided by SMOS satellite (products related to the soil moisture on earth).

This way, we have two parallel systems: one detecting flood events a posteriori, one forecasting flood events a priori. The idea is to combine both data and to use machine learning capabilities offered by Apache Spark to generate a new model providing better forecasting capabilities on resolution and accuracy aspects.

The abstract of the presentation I made during the Big Data from Space conference held in Tenerife on March 2016 is available on the official JRC publications repository (with a lot of other very interesting subjects!) or directly here.

Get data from/to Dropbox using Apache NiFi

Few days ago, on the mailing list, a question has been asked regarding the possibility to retrieve data from a smartphone using Apache NiFi. One suggestion was to use a cloud sharing service as an intermediary like Box, DropBox, Google Drive, AWS, etc.

Obviously, it already exists solutions to sync data from these services on your computer. But we could imagine using Apache NiFi to sync data between such services, or even better: to have an account on each of these services and use Apache NiFi as a single endpoint to use and manage all your accounts as a unique cloud storage facility. There are a lot of options out there.

For this post, let’s just use Apache NiFi to get data from/to a Dropbox account. I let you guys use your imagination to develop amazing things!

First, I need to allow my account being accessed by an external application. To do that, I go on this page and I click on the button “Create App”. Then I choose the “Dropbox API” and, in my case, I select the “App folder” type of access (only one directory will be accessible to my application). In the end I choose the name of my application “NIFI”. This will automatically create a folder Applications/NIFI in my Dropbox (this will be seen as the root directory of your Dropbox account from your application point of view: see it as “/”).

Now you need to generate an access token for your application. On the page of your application you have a link to generate the token. Once generated, keep it securely for yourself!

OK! We have everything we need! Just to be sure all is working, you can check using curl that you have the access:

curl https://api.dropbox.com/1/account/info -H "Authorization:Bearer <YOUR-ACCESS-TOKEN>"

Note: here is a post about the new Dropbox API.

Now let’s jump to Apache NiFi to have a flow to automatically download files from the Dropbox folder.

1. List content in Dropbox folder

This can be achieved using :

curl -X POST https://api.dropboxapi.com/2-beta-2/files/list_folder \
    --header "Authorization: Bearer <access-token>" \
    --header "Content-Type: application/json" \
    --data "{\"path\": \"\"}"

With NiFi, we have to use the InvokeHTTP processor. At the time of writing, it is necessary to use an incoming FlowFile to set the content to be sent with a POST request. I will shortly open an issue about that and, hopefully, it should be possible to directly set the body in NiFi 0.7.0.

OK, so to get the content of the request set to

{\"path\": \"\"}

a workaround is to use a local file on your computer and to get it with a GetFile processor.

In conclusion, to issue the request to list the content of the folder we need:

list_files

The GetFile processor has the following properties:

  • regarding the scheduling, I set it CRON-based with “*/5 * * * * ?” to issue the HTTP request every 5 seconds
  • regarding the properties:

get_file.PNG

You notice that I reference a specific file in D:\tmp. This file is named list_files.json and its content is:

{"path": ""}

Also, I configure the processor not to delete the file (Keep Source File = true).

Every 5 seconds, a FlowFile with the expected content will be sent to the InvokeHTTP processor which has the following properties:

list_invokehttp.png

I set the method to POST and set the URL endpoint as specified by Dropbox API. I manually set the Content-Type to “application/json” (the content will be set with the content of the incoming FlowFile). And I manually add a property with the key “Authorization” and the value to “Bearer <Access Token>” (to be changed with your access token). This will add the expected header property.

That is it! At this point we are able to list the content of our Dropbox folder. The output of the InvokeHTTP processor (in response relationship) will have a content looking like:

{"entries": 
[
{".tag": "file", "name": "file_3.txt", "path_lower": "/file_3.txt", "path_display": "/file_3.txt", "id": "id:EoMhAg5eUKAAAAAAAAAAAQ", "client_modified": "2016-03-11T12:35:31Z", "server_modified": "2016-03-11T12:35:31Z", "rev": "145c5a2f7", "size": 13}, 
{".tag": "file", "name": "file_1.txt", "path_lower": "/file_1.txt", "path_display": "/file_1.txt", "id": "id:qNPLoI0buzAAAAAAAAAAAg", "client_modified": "2016-03-11T12:35:32Z", "server_modified": "2016-03-11T12:35:32Z", "rev": "245c5a2f7", "size": 13}, 
{".tag": "file", "name": "file_2.txt", "path_lower": "/file_2.txt", "path_display": "/file_2.txt", "id": "id:ib7z-SMEqYAAAAAAAAAAAQ", "client_modified": "2016-03-11T12:35:33Z", "server_modified": "2016-03-11T12:35:33Z", "rev": "345c5a2f7", "size": 13}
], 
"cursor": "AAG4qAdoMOQVhshkMwFchOjPUnMWuGIvrQkZMb3L1aFa9euMXonxsG0S0_RIfxfFxYMxoz2cKFqc3laWcyDdM5MixrJ3AZ4jAyebx5s70k69z6KrBTE_IUh4Vnd2UZCUIAA", "has_more": false}

We can see the three files I have added in my Dropbox folder.

2. Prepare requests to download files

Let’s have a look at the request we have to send to download a file:

curl -X POST https://api.dropboxapi.com/2-beta-2/files/download \
    --header "Authorization: Bearer <access-token>" \
    --header "Dropbox-API-Arg: {\"path\": \"/cupcake.png\"}"

So, we have to split our FlowFile to have one by file we want to download. Let’s do that with a SplitJson processor with the following properties:

split_json

As we saw, we have our response with a JSON containing an array (named “entries”) with each of the files present in the Dropbox folder. This processor is designed to output a FlowFile by entry of a JSON array located at the given JSON Path. At this point, we will have a FlowFile by file we want to download. Each one will have as content something like:

{".tag": "file", "name": "file_3.txt", "path_lower": "/file_3.txt", "path_display": "/file_3.txt", "id": "id:EoMhAg5eUKAAAAAAAAAAAQ", "client_modified": "2016-03-11T12:35:31Z", "server_modified": "2016-03-11T12:35:31Z", "rev": "145c5a2f7", "size": 13}

Now let’s take advantage of the Expression Language of NiFi: we want to extract some values of this JSON content to add it in some attributes. For that, we use the EvaluateJsonPath processor with the following properties:

evaluate_json_path

In this case I extract the value associated to “path_lower” in the JSON to set it to an attribute with the key “path_lower”. For example, an output FlowFile will have an attribute with the key “path_lower” and the value “/file_3.txt”.

I also override the attribute “filename” (this attribute is already existing since this is a core attribute in NiFi). So far this attribute was containing “list_files.json” from my initial FlowFile at the very beginning of the flow. I change it with the name of the file I want to download (otherwise my files will be downloaded with the name “list_files.json”).

The last thing to do before being able to call a new InvokeHTTP processor is to clean the FlowFile content. This is due to the fact that the Dropbox API is not expecting any content in the request and will return an error if there is.

This is done with a ReplaceText processor with the following properties:

replace_text

Here, I just delete all the content of my FlowFile to only keep the attributes.

3. Download files

We are now ready to configure a last InvokeHTTP processor to download our files.

Note: the Dropbox API is expecting a POST request with no Content-Type otherwise it will return an error. It is kind of a strange behavior but no choice here… It was not possible, with the processor, to have a POST request without Content-Type, so I issued a JIRA and proposed a fix that should be made available in NiFi 0.6.0.

Here are the properties of the processor:

download_invokehttp

I set the method to POST and set the expected endpoint URL. I explicitly set the Content-Type as an empty string. And I manually add two properties for the header properties to add. One for authorization (to change with your access token) and one with the expected JSON (using NiFi Expression Language to get the attribute value of “path_lower” we got earlier) to specify the file to download.

The final step is to add a PutFile processor to put downloaded files into the wanted location. Here are the properties:

putfile

That’s all! We now have a full flow ready to download files from your Dropbox account!

download_flow.PNG

3. Upload files

From this point, it is easy to have the same logic and to upload files based on the needed request:

curl -X POST https://api.dropboxapi.com/2-beta-2/files/upload \
     --header "Authorization: Bearer <access-token>" \
     --header "Content-Type: application/octet-stream" \
     --header "Dropbox-API-Arg: {\"path\": \"/cupcake.png\", \"mode\": \"overwrite\"}" \
     --data-binary @local-file.png

We just need to link a similar GetFile processor (as described before) with a new InvokeHTTP processor with the following properties:

upload_file.png

This way, it will upload the incoming FlowFile with the correct file name at the root of your Dropbox directory (don’t forget to use your own Dropbox access token).

In conclusion, for upload, the flow is as simple as:

upload_flow.PNG

That is it! I think you have everything you need to enjoy NiFi if you want to play with it to handle content on your Dropbox account.

Here is the template of the full upload/download flow (don’t forget to update it with your access token).

Feel free to add comments and ask questions about this post!

Transform data with Apache NiFi

Few days ago, I just started to have a look into Apache NiFi which is now part of the Hortonworks Data Flow distribution (HDF). Based on my experience at Capgemini and the kind of projects into I have been involved, I immediately realized that it is a powerful system that can be used in a wide range of situations and problems.

Apache NiFi is an easy to use, powerful, and reliable system to process and distribute data. It supports powerful and scalable directed graphs of data routing, transformation, and system mediation logic. Some of the high-level capabilities and objectives of Apache NiFi include:

  • Web-based user interface
    • Seamless experience between design, control, feedback, and monitoring
  • Highly configurable
    • Loss tolerant vs guaranteed delivery
    • Low latency vs high throughput
    • Dynamic prioritization
    • Flow can be modified at runtime
    • Back pressure
  • Data Provenance
    • Track dataflow from beginning to end
  • Designed for extension
    • Build your own processors and more
    • Enables rapid development and effective testing
  • Secure
    • SSL, SSH, HTTPS, encrypted content, etc…
    • Pluggable role-based authentication/authorization

It must also be noted, that it is really easy to get started with NiFi (whatever the OS you are using): just download it, run it, and open your favorite web browser at localhost:8080.

Now you may tell yourself that it is just a tool to get data from a point A to a point B according to parameters, conditions, etc. In other words, you may think that it is just a tool to extract and load data. What are the transformation features offered by NiFi?

Just have a look to the list of 125+ processors available at this moment and you will have a good idea of what you can achieve. But let’s say you need to do some very specific actions on your data and you don’t see any processor suitable to your need. So what’s next?

First, you can write your own processor, it is very easy and straightforward (NiFi developer guide). Otherwise you can leverage some existing processors to execute custom code. For example you can achieve a lot of work with ExecuteScript and ExecuteStreamCommand processors. If you are interested by the first one, have a look on this blog to find useful and complete examples.

In this post, I want to focus on ExecuteStreamCommand and how it can be used to define data transformation flows. One common use case I see is to get files from one place and execute an application to transform the files.

For simplicity, let’s say I have an input directory in which files to be processed are coming. For each file A of this directory, I want to execute an application to transform this data into a new file B:

flow_AtoB

This can be easily achieved by the combination of two processors: ListFiles and ExecuteStreamCommand.

Here is an example running on Windows: I look for any new file in “D:\tmp-input” with a ListFiles processor using following properties:

listfiles

For each new file coming in this directory, the processor will generate a FlowFile (see NiFi documentation to learn about NiFi principles) with some useful attributes and no content.

Now, I have a batch file that I want to be executed on each file. This batch file takes exactly one parameter which is the path of the file to be processed.

My batch file is the following (command.bat) and is very simple, it moves the given file into another directory (obviously, this is just an example, if we just want to move files, NiFi can do that without executing commands!):

@echo off
MOVE %1 "D:\tmp" >nul

Then my ExecuteStreamCommand will have the following properties:

executeStreamCommand

Using the Expression Language provided with NiFi, I can extract the path of the file previously listed by ListFiles processor by extracting information from the FlowFile attributes.

${absolute.path}${filename}

This is the concatenation of attributes “absolute.path” and “filename” from the incoming FlowFile (attributes set by ListFiles processor).

The processor can send to the executed process the content of the incoming FlowFile, but in my case there is no content and I don’t want such a thing (Ignore STDIN = true). Besides, this processor can create a new FlowFile using the output of the command as content of the newly created FlowFile. But this is something I don’t want with this command, so I set the property “Output Destination Attribute” with the value “result”. This way, the output of my command is used as a new (or updated) attribute of my original FlowFile.

If you don’t need this FlowFile anymore, you can auto-terminate the processor, and you have a ready-to-go flow for processing files using your own command.

If you want another process to run on the newly created file (in D:\tmp), you can copy/paste this flow logic and use another ListFiles processor to scan the directory.

basic_flow.PNG

So far, it is a very basic operation. Something closer to real use case would be to create a flow where ExecuteStreamCommand processor gets information back from the executed command and passes it to the next processor.

Let’s transform my batch file to return the parameters to be passed to the next processor. In my example, my batch file moves the file given as first parameter to file path given in second parameter. At the end, it displays the new path of the file (second parameter) and the third parameter to be used in next processor (again, this is an example, in real world such parameters would probably be computed by the executed process itself).

@echo off
ECHO %* >> %1
MOVE %1 %2 >nul
ECHO "%2;%3"

Note: since I configured my processor to accept arguments delimited with “;” I display parameters to be passed with this delimiter. It is possible to change the delimiter in the processor properties (I recommend reading the usage documentation associated to each processor, this is very helpful).

Let’s have a look to the configuration of my first ExecuteStreamCommand processor:

first_execute

This will execute my batch file with:

  • 1st parameter:  path of my input file listed by ListFiles processor
  • 2nd parameter: destination path of my file
  • 3rd parameter: parameter to be passed to next processor

My batch file, once called, will move my file to “D:\step1.txt” and will display:

“D:\\step1.txt;D:\\step2.txt”

This will be set at the value of the attribute with “result” as key in the generated FlowFile.

The second ExecuteStreamCommand processor has the following configuration:

second_execute

Here, I use Expression Language functions (replaceAll) to remove special characters introduced in the process (it is possible to adapt the batch script to avoid this operation).

 

This will execute my batch file with:

  • 1st parameter:  D:\\step1.txt
  • 2nd parameter: D:\\step2.txt
  • 3rd parameter: null

My batch file, once called, will move my file to “D:\step2.txt” and will display:

“D:\\step2.txt”

In conclusion, by taking advantage of the Expression Language and by controlling the output of the executed commands, I can use the FlowFile initially created by ListFiles processors to carry information along the flow and propagate information and arguments to following steps.

For a better understanding, here is a picture of how the flow file has evolved through the flow:

flow.PNG

Between the ListFiles processor and the first ExecuteStreamCommand processor, the flow file has the following attributes :

flow_file_1

Then between the two ExecuteStreamCommand processors:

flow_file_2

And at the end of the flow (if we want to add other processors):

flow_file_3

As a side note, we can see that the return code of the executed command is available in the FlowFile with the attribute “execution.status”. It easily allows to route FlowFile depending of what is the return code using RouteOnAttribute processor. Thus, it is possible to implement “catch” / “retry” strategies in case of error.

Please feel free to comment/ask questions about this post!

 

Let’s start !

OK… I feel the need that I am going to need some place to talk about my experiments, remarks, etc. So far I have been doing that in French on another site… but with my new job coming, I want to do things at a different level. Let’s start!

As a side note, I hope I will find some time to get back here some old posts I wrote on my previous blog.