XML data processing with Apache NiFi

Notelook at the new features in NiFi 1.7+ about XML processing in this post

I recently had to work on a NiFi workflow to process millions of XML documents per day. One of the step being the conversion of the XML data into JSON. It raises the question of the performances and I will briefly expose my observations in this post.

The two most natural approaches to convert XML data with Apache NiFi are:

  • Use the TransformXML processor with a XSLT file
  • Use a scripted processor or use a custom Java processor relying on a library

There are few XSLT available on the internet providing a generic way to transform any XML into a JSON document. That’s really convenient and easy to use. However, depending of your use case, you might need specific features.

In my case, I’m processing a lot of XML files based on the same input schema (XSD) and I want the output to be compliant to the same Avro schema (in order to use the record-oriented processors in NiFi). The main issue is to force the generation of an array when you only have one single element in your input.

XSLT approach

Example #1:

<MyDocument>
  <MyList>
    <MyElement>
      <Text>Some text...</Text>
      <RecordID>1</RecordID>
    </MyElement>
    <MyElement>
      <Text>Some text...</Text>
      <RecordID>1</RecordID>
    </MyElement>
  </MyList>
</MyDocument>

This XML document will be converted into the following JSON:

{
   "MyDocument" : {
     "MyList" : {
       "MyElement" : [ {
           "Text" : "Some text...",
           "RecordID" : 1
         }, {
           "Text" : "Some text...",
           "RecordID" : 2
         } ]
      }
   }
}

Example #2:

However, if you have the following XML document:

<MyDocument>
  <MyList>
    <MyElement>
      <Text>Some text...</Text>
      <RecordID>1</RecordID>
    </MyElement>
  </MyList>
</MyDocument>

The document will be converted into:

{
  "MyDocument" : {
    "MyList" : {
      "MyElement" : {
        "Text" : "Some text...",
        "RecordID" : 1
      }
    }
  }
}

Force array

And here start the problems… because we don’t have the same Avro schema. That is why I recommend using the XSLT file provided by Bram Stein here on Github. It provides a way to force the creation of an array. To do that, you need to insert a tag into your XML input file. The tag to insert is

json:force-array="true"

But for this tag to be correctly interpreted, you also need to specify the corresponding namespace:

xmlns:json="http://json.org/"

In the end, using ReplaceText processors with regular expressions, you need to have the following input (for the example #2):

<MyDocument xmlns:json="http://json.org/">
  <MyList>
    <MyElement json:force-array="true">
      <Text>Some text...</Text>
      <RecordID>1</RecordID>
    </MyElement>
  </MyList>
</MyDocument>

And this will give you:

{
  "MyDocument" : {
    "MyList" : {
      "MyElement" : [ {
        "Text" : "Some text...",
        "RecordID" : 1
      } ]
    }
  }
}

And now I do have the same schema describing my JSON documents. Conclusion: you need to use regular expressions to add a namespace in the first tag of your document and add the JSON array tag in every tag wrapping elements that should be part of an array.

Java approach

Now, let’s assume you’re not afraid about using scripted processors or developing your own custom processor. Then it’s really easy to have a processor doing the same using a Java library like org.json (note that library is *NOT* Apache friendly in terms of licensing and that’s why the following code cannot be released with Apache NiFi). Here is an example of custom processor doing the conversion. And here is a Groovy version for the ExecuteScript processor.

What about arrays with this solution? Guess what… It’s kind of similar: you have to use a ReplaceText processor before and after to ensure that arrays are arrays in the JSON output for any number of elements in your input. Also, you might have to do some other transformations like removing the namespaces or replacing empty strings

""

by

null

values (by default, everything will be converted to an empty string although you might want null record instead).

To force arrays, the easiest approach is to double every tag that should be converted into an array. With the example #2, I transform my input to have:

<MyDocument>
  <MyList>
    <MyElement /><MyElement>
      <Text>Some text...</Text>
      <RecordID>1</RecordID>
    </MyElement>
  </MyList>
</MyDocument>

It’ll give me the following JSON:

{
  "MyDocument" : {
    "MyList" : {
      "MyElement" : [ "", {
        "Text" : "Some text...",
        "RecordID" : 1
      } ]
    }
  }
}

And, then, I can use another ReplaceText processor to remove the unwanted empty strings created by the conversion.

Conclusion: with the two approaches you’ll need to be a bit intrusive in your data to get the expected results. What about the performances now?

Benchmark

I remove the ReplaceText processors from the equation as I usually need the same amount of regular expressions work in both cases. I want to only focus on:

I’ll compare the performances of each case using input of different sizes (data generated using a GenerateFlowFile processor) with default configuration (one thread, no change on run duration, etc) on my laptop.

Method: I’m generating as much data as possible (it’s always the same file during a single run) using the GenerateFlowFile processor. I wait at least 5 minutes to have a constant rate of processing and I get the mean on a 5 minutes window of constant processing.

Screen Shot 2017-09-07 at 12.12.12 AM.png

For each run, I’m only running the GenerateFlowFile, one of the three processors I’m benchmarking, and the UpdateAttribute (used to only drop the data).

The input data used for the benchmark is a fairly complex XML document with arrays of arrays, lot of elements in the arrays, deeply nested records, etc. To reduce the size of the input size, I’m not changing the structure but only removing elements in the arrays. In other words: the schema describing the output data remains the same for each run.

Note that the custom Java/Groovy option is loading the full XML document in memory. To process very large XML document, a streaming approach with another library would certainly be better suited.

Here are the results with input data of 5KB, 10KB, 100KB, 500KB and 1000KB. The below graph gives the number of XML files processed per second based on the input size for each solution.

Screen Shot 2017-09-07 at 10.16.45 PM

It’s clear that the custom Java processor is the most efficient one. The XSLT option is really nice when you want to do very specific transformations but it can quickly get slow. Using a generic XSLT file for XML to JSON transformation is easy and convenient but won’t be the most efficient option.

We can also notice that the Groovy option is a little bit less efficient than the Java one, but that’s expected. Nevertheless, the Groovy option provides pretty good performances and does not require building and compiling a custom processor: everything can be done directly from the NiFi UI.

To improve the performances, it’s then possible to play with the “run duration” parameter and increase the number of concurrent tasks. Actually it’s quite easy to reach the I/O limitations of the disks. Using a NiFi cluster and multiple disks for the content repository, it’s really easy to process hundreds of millions of XML documents per day.

If we display the performance ratio based on the file size between the XSLT solution and the Java based solution, we have:

Screen Shot 2017-09-07 at 10.28.46 PM

We can see that with very small files, the processing using Java-based processor is about 13x more efficient than the XSLT approach. But with files over 100KB, the Java solution is about 26x more efficient. That’s because the NiFi framework is doing few things before and after a flow file has been processed. When processing thousands of flow files per second it creates a small overhead that explains the difference.

XML Record Reader

Since few versions, Apache NiFi contains record-oriented processors. It provides very powerful means to process record-oriented data. In particular, it allows users to process batches of data instead of a “per-file” processing. This provides a very robust and high rate processing. While I’m writing this post there is no reader for XML data yet. However there is a JIRA for it and it would provide few interesting features:

  • By using a schema describing the XML data, it’d remove the need to use ReplaceText processors to handle the “array problem”.
  • It’d give the possibility to merge XML documents together to process much more data at once providing even better performances.

This effort can be tracked under NIFI-4366.

As usual, feel free to post any comment/question/feedback.

Monitoring NiFi – Scripted Reporting Task

Note – This article is part of a series discussing subjects around NiFi monitoring.

In the new release of Apache NiFi (1.2.0), you can now develop Scripted Reporting Task thanks to NIFI-1458. It is the same approach as with the ExecuteScript processor for which you have tons of great examples here.

You might also want to read the following posts:

With the ScriptedReportingTask you can define your own implementation of the onTrigger() method and get access to:

  • ReportingContext context (which gives you access to various information such as events, provenance, bulletins, controller services, process groups, etc)
  • VirtualMachineMetrics vmMetrics (to access the metrics of the JVM)
  • ComponentLog log (if you want to log messages)

Let’s start with a very easy example: I want to log the number of threads inside my JVM every minute. Here is my code:

log.info("Thread count = " + vmMetrics.daemonThreadCount())

Screen Shot 2017-05-12 at 5.43.45 PM.png

And I can check in my nifi-app.log file that I do have:

2017-05-12 17:43:27,639 INFO [Timer-Driven Process Thread-5] o.a.n.r.script.ScriptedReportingTask ScriptedReportingTask[id=fd1668eb-015b-1000-1974-5ef96e1f9a8b] Thread count = 29

OK… now I won’t go into the details of all the information you can access using the “context” variable but let’s try another example…

I want to send a POST request over HTTP containing a JSON representation of the summary of my root process group.

I’m not really used to Groovy so please excuse my coding style ;-). But here is a working code (available here as well):

def json = Class.forName("javax.json.Json")
def httpClients = Class.forName("org.apache.http.impl.client.HttpClients")
def contentType = Class.forName("org.apache.http.entity.ContentType")

def status = context.getEventAccess().getControllerStatus();
def factory = json.createBuilderFactory(Collections.emptyMap());
def builder = factory.createObjectBuilder();

builder.add("componentId", status.getId());
builder.add("bytesRead", status.getBytesRead());
builder.add("bytesWritten", status.getBytesWritten());
builder.add("bytesReceived", status.getBytesReceived());
builder.add("bytesSent", status.getBytesSent());
builder.add("bytesTransferred", status.getBytesTransferred());
builder.add("flowFilesReceived", status.getFlowFilesReceived());
builder.add("flowFilesSent", status.getFlowFilesSent());
builder.add("flowFilesTransferred", status.getFlowFilesTransferred());
builder.add("inputContentSize", status.getInputContentSize());
builder.add("inputCount", status.getInputCount());
builder.add("outputContentSize", status.getOutputContentSize());
builder.add("outputCount", status.getOutputCount());
builder.add("queuedContentSize", status.getQueuedContentSize());
builder.add("activeThreadCount", status.getActiveThreadCount());
builder.add("queuedCount", status.getQueuedCount());

def requestEntity = new org.apache.http.entity.StringEntity(builder.build().toString(), contentType.APPLICATION_JSON);
def httpclient = httpClients.createDefault();
def postMethod = new org.apache.http.client.methods.HttpPost("http://localhost:9999/rootStatus");
postMethod.setEntity(requestEntity);
httpclient.execute(postMethod);
httpclient.close();

Note – this should be improved to, for instance, properly handle potential exceptions.

To get this code working, I also have to specify the required dependencies (JSON, Apache HTTP, etc). For that, in the module directory property of the reporting task, I gave the following paths (because I’m lazy, I am pointing to much dependencies than required):

  • /var/lib/nifi/work/nar/extensions/nifi-standard-nar-1.2.0.nar-unpacked/META-INF/bundled-dependencies/
  • /var/lib/nifi/work/nar/extensions/nifi-site-to-site-reporting-nar-1.2.0.nar-unpacked/META-INF/bundled-dependencies/

In my example I’m sending my JSON payload with a POST HTTP request to localhost on port 9999 with the path rootStatus. To receive the request, I started a ListenHttp processor with the following configuration:

Screen Shot 2017-05-12 at 8.00.31 PM.png

Once my reporting task is started, I start receiving the information as flow files:

Screen Shot 2017-05-12 at 8.08.55 PM.png

This scripted reporting task allows you to quickly develop proof of concept to send information to your internal systems using the interfaces you want. However, according to your needs, it might be more interesting to develop your own reporting task in Java and to build the corresponding NAR. It will give you more flexibility/options (you’ll be able to implement more interfaces) and better performances.

As usual feel free to ask questions and comment this post.

Using counters in Apache NiFi

You may not know it but you have the availability to define and play with counters in NiFi. If policies are correctly configured (if your NiFi is secured), you should be able to access the existing counters using the menu:

Screen Shot 2017-02-07 at 5.33.59 PM.png

Counters are just values that you can increase or decrease of a given delta. This is useful if you want to monitor particular values along your workflow. At the moment, unless you use a processor that explicitly uses counters or provides a way to define counters, there is nothing available out of the box.

The best way to define and update counters is to use ExecuteScript processor with the following piece of Groovy code:

def flowFile = session.get()
if(!flowFile) return
session.adjustCounter("my-counter", 1, true)
session.transfer(flowFile, REL_SUCCESS)

With this example, the ExecuteScript processor will just transmit the flow file without any modification to the success relationship but will also increment the counter “my-counter” of 1. If this counter does not exist it will be initialized with the delta value given as argument.

Here is the documentation of this method:

    /**
     * Adjusts counter data for the given counter name and takes care of
     * registering the counter if not already present. The adjustment occurs
     * only if and when the ProcessSession is committed.
     *
     * @param name the name of the counter
     * @param delta the delta by which to modify the counter (+ or -)
     * @param immediate if true, the counter will be updated immediately,
     *            without regard to whether the ProcessSession is commit or rolled back;
     *            otherwise, the counter will be incremented only if and when the
     *            ProcessSession is committed.
     */
    void adjustCounter(String name, long delta, boolean immediate);

Let’s see an example: I want to confirm the correct behavior of the GetHDFS processor when I have multiple instances of this processor looking into the same directory but getting different flow files based on a regular expression.

Here is the first part of my flow:

screen-shot-2017-02-07-at-6-20-52-pm

Basically, I am generating flow files every 1ms with GenerateFlowFile. The generated flow files will be named after the generation date timestamp (without any extension). I am sending the files into HDFS and then I’m using a RouteOnAttribute where I check the filename according to a regular expression to split files according to even and uneven names. This way I can increment counters tracking the number of files I sent to HDFS with even names and with uneven names.

Here is the second part of my flow:

Screen Shot 2017-02-07 at 6.27.28 PM.png

I have two instances of GetHDFS processor configured to look into the same input directory but one with a regular expression to look for files with an even name, and one to look for files with an uneven name, this way there is no concurrent access. Besides, the processor is configured to delete the file on HDFS once the file is retrieved in NiFi. Then I update two different counters to track the number of files with an even name that I retrieved from HDFS, and one for files with an uneven name.

If everything is working correctly, I should be able to let run my workflow a bit, then stop the generation of flow files, wait for all the flow files to be processed and confirm that:

  • even-producer counter is equal to even-consumer counter
  • unenven-producer counter is equal to uneven-consumer counter

Let’s have a look into our counters table:

screen-shot-2017-02-07-at-6-47-02-pm

It looks like we are all good 😉

As a remark, if you have multiple processors updating the same counter, then you will have the global value of the counter but also the value at each processor level. For example, if I have:

screen-shot-2017-02-07-at-6-53-09-pm

With both ExecuteScript incrementing the same “test” counter, then, I’ll have:

screen-shot-2017-02-07-at-6-55-48-pm

Also, as a last remark, you can notice that it’s possible to reset a counter to 0 from the counters table with the button in the last column (assuming you have write access to the counters based on the defined policies). It can be useful when doing some tests.

As always, questions/comments are welcomed!

OAuth 1.0A with Apache NiFi (Twitter API example)

A lot of API are using OAuth  protocol to authorize the received requests and to check if everything is OK regarding the identity of the request sender.

OAuth is an open standard for authorization, commonly used as a way for Internet users to log into third party websites using their Microsoft, Google, Facebook, Twitter, One Network etc. accounts without exposing their password. Generally, OAuth provides to clients a “secure delegated access” to server resources on behalf of a resource owner. It specifies a process for resource owners to authorize third-party access to their server resources without sharing their credentials. Designed specifically to work with Hypertext Transfer Protocol (HTTP), OAuth essentially allows access tokens to be issued to third-party clients by an authorization server, with the approval of the resource owner. The third party then uses the access token to access the protected resources hosted by the resource server.

As a remark, there are two versions of the protocol currently used out there: 1.0A and 2.0. As far as I know, 1.0A is the most commonly used. I already faced the need to use OAuth 1.0A protocol with the Flickr API but, back then, I found a way to get my data differently.

Recently, a question was asked on the Hortonworks Community Connection regarding the use of Apache NiFi to get data from Twitter API using OAuth 1.0A protocol. So this time, I decided to have a look on this and to get the job done.

This post presents the flow I used to perform a request against Twitter API using OAuth protocol. It gives me the opportunity to use for the first time the ExecuteScript processor which allows user to execute custom scripts on the fly inside NiFi (you will find a lot of examples on this great site).

Note 1: this was the first time I used Groovy language, be nice with me!

Note 2: I didn’t test the flow on a lot of methods. Some modifications may be necessary for some cases.

OK. The objective was to request the “users/lookup” method of the Twitter API. You can read the documentation here.

I want to perform a HTTP GET on:

https://api.twitter.com/1.1/users/lookup.json?screen_name=twitterapi,twitter

So far it seems really easy to do with a simple InvokeHTTP processor. The thing is you need to identify yourself when sending the request. Here comes the OAuth protocol. The official specification for 1.0A can be read here. But in the case of the Twitter API, you have a nice documentation here.

Besides on the documentation of each method, you have an OAuth Signature Generator that can be accessed (if you have defined a Twitter App). The generator is here. It lets you play around and gives great insights on each request to debug your own implementation of OAuth protocol.

The global idea is: you have a lot of input parameters and you must follow the specifications to construct a string based on the parameters. This string will be the value associated to “Authorization” key in HTTP header properties.

Here is the list of the needed parameters. First the parameters directly linked to your request:

Then the global parameters related to OAuth:

  • Consumer key (private information of your app provided by Twitter)
  • Consumer secret (private information of your app provided by Twitter)
  • Nonce (random string, uniquely generated for each request)
  • Signature method (with Twitter it is HMAC-SHA1)
  • Timestamp (in seconds)
  • Token (private information of your app provided by Twitter)
  • Token secret (private information of your app provided by Twitter)
  • Version (in this case 1.0)

The first step is to construct the “signature base string“. For that you first need to create the “parameter string“. All is very well explained here. Once you have the signature base string, you can encode it using HMAC-SHA1 and you easily get the header property to set in your HTTP request:

Authorization: OAuth oauth_consumer_key="*******", oauth_nonce="a9ab2392e5158a4c4e029c7829164571", oauth_signature="4s4Hi5hQ%2FoLKprW7qsRlImds3Og%3D", oauth_signature_method="HMAC-SHA1", oauth_timestamp="1460453975", oauth_token="*******", oauth_version="1.0"

Let’s get into the details using Apache NiFi. Here is the flow I created:

oauthFlow

I use a GenerateFlowFile to generate an empty Flow File (FF) in order to execute my flow. Then I use an UpdateAttribute processor to add attributes to my FF. In this case, I only add the parameters related to the specific request I want to execute:

globalParam

Then I send my FF into a process group that will compute the header property to set (I will come back to this part later). Then I perform my request using the InvokeHTTP processor:

invokeHTTP

I set the method to GET, the URL to my corresponding FF attribute, the content type to text/plain and I add a custom property named Authorization with the FF attribute I created in my process group (see below). This custom property will be added as a HTTP header in the request. At the end, I use a PutFile processor to write the result of my request in a local directory.

Let’s go to the interesting part of our flow where all the magic is: the process group I named OAuth 1.0A. Here it is:

processGroup

I just use two processors. The first one is an UpdateAttribute to add all the parameters I need as attributes of my FF. the second one is an ExecuteScript processor where I put my groovy code to compute the header property.

First… the parameters:

oauthParameters

Note: I use Expression Language provided by NiFi for some attributes.

  • arguments is used to extract the argument part in my target URL. In this example: screen_name=twitterapi,twitter
  • base_url is the URL I request without any argument. In this example: https://api.twitter.com/1.1/users/lookup.json
  • For the nonce parameter I use the “UUID” method of the expression language which generated a random string and I just take to replace the ‘-‘ characters to only keep an alphanumeric string.
  • For the timestamp, I use the “now” method of the expression language and I use substring to only keep seconds.

Let’s move to the ExecuteScript part. I set the script engine to Groovy and I put my script code in the “script body” property. The full code is available at the end of the post. Let’s go through it piece by piece.

First thing, I want to trigger my code only when a FF is available:

def flowFile = session.get()
if (!flowFile) return

Then I define a method I will use for the HMAC-SHA1 encoding:

def static hmac(String data, String key) throws java.security.SignatureException
{
    String result
    try {
        // get an hmac_sha1 key from the raw key bytes
        SecretKeySpec signingKey = new SecretKeySpec(key.getBytes(), "HmacSHA1");
        // get an hmac_sha1 Mac instance and initialize with the signing key
        Mac mac = Mac.getInstance("HmacSHA1");
        mac.init(signingKey);
        // compute the hmac on input data bytes
        byte[] rawHmac = mac.doFinal(data.getBytes());
        result= rawHmac.encodeBase64()
    } catch (Exception e) {
        throw new SignatureException("Failed to generate HMAC : " + e.getMessage());
    }
    return result
}

For this part, I will need to add some imports at the beginning of my script body:

import java.security.SignatureException
import javax.crypto.Mac
import javax.crypto.spec.SecretKeySpec

Then I retrieve all the attributes of my FF and I extract some attributes I don’t need to construct my parameter string:

def attributes = flowFile.getAttributes()
// retrieve arguments of the target and split arguments
def arguments = attributes.arguments.tokenize('&')
def method = attributes.method
def base_url = attributes.base_url
def consumerSecret = attributes.oauth_consumer_secret
def tokenSecret = attributes.oauth_token_secret

Then I create a TreeMap in which I add all the parameters I need to construct my parameter string. A TreeMap ensures me that it is sorted on keys in alphabetical order.

TreeMap map = [:]

for (String item : arguments) {
        def (key, value) = item.tokenize('=')
        map.put(key, value)
}

map.put("oauth_consumer_key", attributes.oauth_consumer_key)
map.put("oauth_nonce", attributes.oauth_nonce)
map.put("oauth_signature_method", attributes.oauth_signature_method)
map.put("oauth_timestamp", attributes.oauth_timestamp)
map.put("oauth_token", attributes.oauth_token)
map.put("oauth_version", attributes.oauth_version)

Then I add a method to the String class to allow percent encoding on String objects:

String.metaClass.encode = {
    java.net.URLEncoder.encode(delegate, "UTF-8").replace("+", "%20").replace("*", "%2A").replace("%7E", "~");
}

I am now able to construct the parameter string:

String parameterString = ""

map.each { key, value ->
    parameterString += key.encode()
    parameterString += '='
    parameterString += value.encode()
    parameterString += '&'
}

parameterString = parameterString.substring(0, parameterString.length()-1);

Update: the code above can be simplified as below (see Andy’s comment)

String parameterString = map.collect { String key, String value ->
    "${key.encode()}=${value.encode()}"
}.join("&")

It is now possible to get the signature:

String signatureBaseString = ""
signatureBaseString += method.toUpperCase()
signatureBaseString += '&'
signatureBaseString += base_url.encode()
signatureBaseString += '&'
signatureBaseString += parameterString.encode()

String signingKey = consumerSecret.encode() + '&' + tokenSecret.encode()
String oauthSignature = hmac(signatureBaseString, signingKey)

I may add this information as a new attribute of my FF:

flowFile = session.putAttribute(flowFile, 'oauth_signature', oauthSignature)

Then I can construct the header property value to associate to Authorization key:

String oauth = 'OAuth '
oauth += 'oauth_consumer_key="'
oauth += attributes.oauth_consumer_key.encode()
oauth += '", '
oauth += 'oauth_nonce="'
oauth += attributes.oauth_nonce.encode()
oauth += '", '
oauth += 'oauth_signature="'
oauth += oauthSignature.encode()
oauth += '", '
oauth += 'oauth_signature_method="'
oauth += attributes.oauth_signature_method.encode()
oauth += '", '
oauth += 'oauth_timestamp="'
oauth += attributes.oauth_timestamp.encode()
oauth += '", '
oauth += 'oauth_token="'
oauth += attributes.oauth_token.encode()
oauth += '", '
oauth += 'oauth_version="'
oauth += attributes.oauth_version.encode()
oauth += '"'

I add this information as an attribute (that will be used in the InvokeHTTP processor as we saw before) and I forward my FF to the success relationship:

flowFile = session.putAttribute(flowFile, 'oauth_header', oauth)
session.transfer(flowFile, REL_SUCCESS)

That’s it: I have an operational flow implementing OAuth 1.0A protocol to request against the Twitter API.

The full code is available here as a gist.
The NiFi template is available here.

As always, feel free to ask questions and comment this post!