Transform data with Apache NiFi

Few days ago, I just started to have a look into Apache NiFi which is now part of the Hortonworks Data Flow distribution (HDF). Based on my experience at Capgemini and the kind of projects into I have been involved, I immediately realized that it is a powerful system that can be used in a wide range of situations and problems.

Apache NiFi is an easy to use, powerful, and reliable system to process and distribute data. It supports powerful and scalable directed graphs of data routing, transformation, and system mediation logic. Some of the high-level capabilities and objectives of Apache NiFi include:

  • Web-based user interface
    • Seamless experience between design, control, feedback, and monitoring
  • Highly configurable
    • Loss tolerant vs guaranteed delivery
    • Low latency vs high throughput
    • Dynamic prioritization
    • Flow can be modified at runtime
    • Back pressure
  • Data Provenance
    • Track dataflow from beginning to end
  • Designed for extension
    • Build your own processors and more
    • Enables rapid development and effective testing
  • Secure
    • SSL, SSH, HTTPS, encrypted content, etc…
    • Pluggable role-based authentication/authorization

It must also be noted, that it is really easy to get started with NiFi (whatever the OS you are using): just download it, run it, and open your favorite web browser at localhost:8080.

Now you may tell yourself that it is just a tool to get data from a point A to a point B according to parameters, conditions, etc. In other words, you may think that it is just a tool to extract and load data. What are the transformation features offered by NiFi?

Just have a look to the list of 125+ processors available at this moment and you will have a good idea of what you can achieve. But let’s say you need to do some very specific actions on your data and you don’t see any processor suitable to your need. So what’s next?

First, you can write your own processor, it is very easy and straightforward (NiFi developer guide). Otherwise you can leverage some existing processors to execute custom code. For example you can achieve a lot of work with ExecuteScript and ExecuteStreamCommand processors. If you are interested by the first one, have a look on this blog to find useful and complete examples.

In this post, I want to focus on ExecuteStreamCommand and how it can be used to define data transformation flows. One common use case I see is to get files from one place and execute an application to transform the files.

For simplicity, let’s say I have an input directory in which files to be processed are coming. For each file A of this directory, I want to execute an application to transform this data into a new file B:

flow_AtoB

This can be easily achieved by the combination of two processors: ListFiles and ExecuteStreamCommand.

Here is an example running on Windows: I look for any new file in “D:\tmp-input” with a ListFiles processor using following properties:

listfiles

For each new file coming in this directory, the processor will generate a FlowFile (see NiFi documentation to learn about NiFi principles) with some useful attributes and no content.

Now, I have a batch file that I want to be executed on each file. This batch file takes exactly one parameter which is the path of the file to be processed.

My batch file is the following (command.bat) and is very simple, it moves the given file into another directory (obviously, this is just an example, if we just want to move files, NiFi can do that without executing commands!):

@echo off
MOVE %1 "D:\tmp" >nul

Then my ExecuteStreamCommand will have the following properties:

executeStreamCommand

Using the Expression Language provided with NiFi, I can extract the path of the file previously listed by ListFiles processor by extracting information from the FlowFile attributes.

${absolute.path}${filename}

This is the concatenation of attributes “absolute.path” and “filename” from the incoming FlowFile (attributes set by ListFiles processor).

The processor can send to the executed process the content of the incoming FlowFile, but in my case there is no content and I don’t want such a thing (Ignore STDIN = true). Besides, this processor can create a new FlowFile using the output of the command as content of the newly created FlowFile. But this is something I don’t want with this command, so I set the property “Output Destination Attribute” with the value “result”. This way, the output of my command is used as a new (or updated) attribute of my original FlowFile.

If you don’t need this FlowFile anymore, you can auto-terminate the processor, and you have a ready-to-go flow for processing files using your own command.

If you want another process to run on the newly created file (in D:\tmp), you can copy/paste this flow logic and use another ListFiles processor to scan the directory.

basic_flow.PNG

So far, it is a very basic operation. Something closer to real use case would be to create a flow where ExecuteStreamCommand processor gets information back from the executed command and passes it to the next processor.

Let’s transform my batch file to return the parameters to be passed to the next processor. In my example, my batch file moves the file given as first parameter to file path given in second parameter. At the end, it displays the new path of the file (second parameter) and the third parameter to be used in next processor (again, this is an example, in real world such parameters would probably be computed by the executed process itself).

@echo off
ECHO %* >> %1
MOVE %1 %2 >nul
ECHO "%2;%3"

Note: since I configured my processor to accept arguments delimited with “;” I display parameters to be passed with this delimiter. It is possible to change the delimiter in the processor properties (I recommend reading the usage documentation associated to each processor, this is very helpful).

Let’s have a look to the configuration of my first ExecuteStreamCommand processor:

first_execute

This will execute my batch file with:

  • 1st parameter:  path of my input file listed by ListFiles processor
  • 2nd parameter: destination path of my file
  • 3rd parameter: parameter to be passed to next processor

My batch file, once called, will move my file to “D:\step1.txt” and will display:

“D:\\step1.txt;D:\\step2.txt”

This will be set at the value of the attribute with “result” as key in the generated FlowFile.

The second ExecuteStreamCommand processor has the following configuration:

second_execute

Here, I use Expression Language functions (replaceAll) to remove special characters introduced in the process (it is possible to adapt the batch script to avoid this operation).

 

This will execute my batch file with:

  • 1st parameter:  D:\\step1.txt
  • 2nd parameter: D:\\step2.txt
  • 3rd parameter: null

My batch file, once called, will move my file to “D:\step2.txt” and will display:

“D:\\step2.txt”

In conclusion, by taking advantage of the Expression Language and by controlling the output of the executed commands, I can use the FlowFile initially created by ListFiles processors to carry information along the flow and propagate information and arguments to following steps.

For a better understanding, here is a picture of how the flow file has evolved through the flow:

flow.PNG

Between the ListFiles processor and the first ExecuteStreamCommand processor, the flow file has the following attributes :

flow_file_1

Then between the two ExecuteStreamCommand processors:

flow_file_2

And at the end of the flow (if we want to add other processors):

flow_file_3

As a side note, we can see that the return code of the executed command is available in the FlowFile with the attribute “execution.status”. It easily allows to route FlowFile depending of what is the return code using RouteOnAttribute processor. Thus, it is possible to implement “catch” / “retry” strategies in case of error.

Please feel free to comment/ask questions about this post!

 

23 thoughts on “Transform data with Apache NiFi

  1. I have SelectHiveQL delivering single flowfile to ExecuteStreamCommand similar to your post. It runs a unix one-line script to determine the max timestamp value from last column and sets Output Destination Attribute. However, absolute.path holds no value. Tried using path (./) but this too does not find file. Any ideas as to why absolute.path is empty?

    Like

    • Hi Wesley, not sure what is the issue, but you might be interested by the new QueryRecord processor, that allows you to execute a query into the content of your flow file. That would probably allow you to get what you’re looking for.

      Like

  2. Hi, I have a preprocessing code written in R and I am trying to use ExecuteStreamCommand to run this code on the data that I fetched using nifi. I am not sure what inputs I should provide in ExecuteStreamCommand to run my R script. can you help me with this?

    Like

    • Hi, I’ve no knowledge about R, but if you have a command working in your shell, then you should be able to execute the same command using the processor in NiFi. Using the first part of your command as the command path property and the other parts of your command in the command arguments property (using the defined delimiter). For example, if I want to execute:
      myCommand arg1 arg2
      I’ll set myCommand as command path,
      I’ll set arg1;arg2 as command arguments,
      and let ; as argument delimiter.
      Hope this helps. If not, feel free to subscribe to user mailing list and send your question to the community 😉

      Like

    • Hi Fauztina,
      were you able to run the R code in ExecuteStreamCommand ? if yes can you please let know how did to run the R code

      Like

  3. Hello , am trying to list google storage bucket using gsutil ls -L gs://bucket/obj1 using executestream processor, the resultant output attribute in the flowfile is showing incomplete data.Wanting to capture Hash (md5) and content-length only from the result into new attributes in the flowile.Any suggestions how to achieve this?

    Like

  4. Hi Team, I have a case in nifi, I am running NIFI in my laptop and I am trying to create a flow to execute a shell script on linux machine.
    Means From my local NIFI will execute a shell script on remote machine? if you have template could you share

    Like

  5. Hi,

    I’m taking input from the Getfile processor and sending it executestream command processor.Initially I used executescript processor with Python.Now,I’m thinking to switch to executestream command processor for better efficiency.Can I used the same code used in executescript processor ?
    I have attached my code here.

    import java.io
    from org.apache.commons.io import IOUtils
    from java.nio.charset import StandardCharsets
    from org.apache.nifi.processor.io import StreamCallback
    import unicodecsv as csv
    from faker import Factory
    from collections import defaultdict
    import json
    import csv
    import io
    import re
    class TransformCallback(StreamCallback):
    def _init_(self):
    pass

    def process(self,inputStream,outputStream):
    inputdata = IOUtils.toString(inputStream,StandardCharsets.ISO_8859_1)
    text = csv.DictReader(io.StringIO(inputdata))
    faker = Factory.create()
    faker.seed_instance(1234)
    names = defaultdict(faker.name)
    emails = defaultdict(faker.email)
    ssns = defaultdict(faker.ssn)
    phone_numbers = defaultdict(faker.phone_number)
    output = defaultdict(list)
    outputlist = list()
    outputstr = ”
    for row in text:
    for k,v in row.items():
    if k == “name”:
    output[‘name’] = names[v]
    elif k == “email”:
    output[’email’] = emails[v]
    elif k == “ssn”:
    output[‘ssn’] = ssns[v]
    elif k == “phone_number”:
    output[‘phone_number’] = phone_numbers[v]
    else:
    output[k] = v
    outputlist = output.values()
    outputstring = (str(outputlist)).replace(‘[‘,”).replace(‘]’,”).replace(“\'”,”).replace(“, “,’,’).replace(‘u’,”)

    outputstr+=outputstring + “\n”

    outputStream.write(outputstr.encode(‘utf-8’))

    flowFile = session.get()
    if flowFile != None:
    flowFile = session.write(flowFile,TransformCallback())
    session.transfer(flowFile, REL_SUCCESS)
    session.commit()

    Can I just have the command path as the path of this script and arguments as empty so that it takes the input flowfile or should I have to change something in the code to accomplish my requirement?

    Thanks,
    Vyshali

    Like

  6. Hello,

    My ExecuiteStreamCommand processor is configured to execute a custom java app, which I packaged as a jar and placed in the /tmp directory on a NiFi host (Linux). I set the permissions to 777, and I can go to the directory and run the app without a problem. I can tell from the NiFi log that the command path, parameters, and the working directory are all correct. Yet, the process fails due to this error:

    2017-12-20 12:27:18,426 ERROR [Timer-Driven Process Thread-13] o.a.n.p.standard.ExecuteStreamCommand ExecuteStreamCommand[id=4cd9b9ef-0160-1000-0000-0000427bf59a] Failed to process session due to org.apache.nifi.processor.exception.ProcessException: java.io.IOException: Cannot run program “java -cp /tmp/mydir/myapp.jar com.myapp.MyMainClass” (in directory “/tmp/mydir”): error=2, No such file or directory: {}
    org.apache.nifi.processor.exception.ProcessException: java.io.IOException: Cannot run program “java -cp /tmp/mydir/myapp.jar com.myapp.MyMainClass” (in directory “/tmp/mydir”): error=2, No such file or directory
    at org.apache.nifi.processors.standard.ExecuteStreamCommand.onTrigger(ExecuteStreamCommand.java:339)
    … (lines omitted)
    Caused by: java.io.IOException: error=2, No such file or directory

    I walked thru the source code of the processor, but id didn’t help with finding the root cause. I will appreciate any advice here.

    Thank you,
    Gene

    Like

    • I figured it out. The problem was that the command path must be just the name of executable, in my case it is “java”. All arguments to java must be specified as the command arguments. My mistake was in putting more than the value “java” in the command path, so the whole string was interpreted as a file path causing the NSF error.

      Like

    • Hi Gene,
      Can you share the configuration of your processor? You need to be carefull about how it works. For example if the command you want to execute is:
      java -cp myjar.jar Main.class
      Then configuration would be:
      Command arguments: -cp;myjar.jar;Main.class
      Command path: java
      Argument delimiter: ;

      Like

  7. Pierre, thank you for this example. Can you refer me to an approach that shows how ExecuteStreamCommand would handle a command that outputs multiple flowfiles from one input flowfile? Unrar, for example. If I have a flowfile that is a rar file and feed that as input to an ExecuteStreamCommand processor instance calling unrar, I will typically get many files as results. Can ExecuteStreamCommand be used to output each output file as its own flowfile? Thank you.

    Like

    • Hi Jim, that’s an interesting question and I’m not sure that one execution of the ExecuteStreamCommand processor is able to generate multiple output flow files for one flow file in the input. I’d say it depends of the use case, but you might need to look into a multi-steps approach: have ESC un-rar your file into an output directory, and then have a List/Fetch File processor looking into that directory. Also, did you look at the compress processors to handle you rar files? (not sure if rar is supported).

      Like

      • Thank you very much for the reply, Pierre. Yes indeed: I looked into the compress processors but they did not appear to handle rar files. What i wound up doing was this:
        1. yum installed an unrar executable
        2. used a RouteOnAttribute processor to filter all rar files to the unrar, calling it from an ExecuteUnrarCommand processor
        3. In that ExecuteUnrarCommand processor I was required to specify a temporary working directory so that unrar could do its thing. I was unable to figure out a way to keep this all in memory, which I imagined would avoid expensive writes of any files to disk when I unpacked the rar files.
        Not as elegant or high performance as I had hoped, but to date it has worked fine and has not had any problem keeping up with flow volume.

        Like

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

This site uses Akismet to reduce spam. Learn how your comment data is processed.