In the new release of Apache NiFi (1.2.0), you can now develop Scripted Reporting Task thanks to NIFI-1458. It is the same approach as with the ExecuteScript processor for which you have tons of great examples here.
You might also want to read the following posts:
With the ScriptedReportingTask you can define your own implementation of the onTrigger() method and get access to:
- ReportingContext context (which gives you access to various information such as events, provenance, bulletins, controller services, process groups, etc)
- VirtualMachineMetrics vmMetrics (to access the metrics of the JVM)
- ComponentLog log (if you want to log messages)
Let’s start with a very easy example: I want to log the number of threads inside my JVM every minute. Here is my code:
log.info("Thread count = " + vmMetrics.daemonThreadCount())
And I can check in my nifi-app.log file that I do have:
2017-05-12 17:43:27,639 INFO [Timer-Driven Process Thread-5] o.a.n.r.script.ScriptedReportingTask ScriptedReportingTask[id=fd1668eb-015b-1000-1974-5ef96e1f9a8b] Thread count = 29
OK… now I won’t go into the details of all the information you can access using the “context” variable but let’s try another example…
I want to send a POST request over HTTP containing a JSON representation of the summary of my root process group.
I’m not really used to Groovy so please excuse my coding style ;-). But here is a working code (available here as well):
def json = Class.forName("javax.json.Json") def httpClients = Class.forName("org.apache.http.impl.client.HttpClients") def contentType = Class.forName("org.apache.http.entity.ContentType") def status = context.getEventAccess().getControllerStatus(); def factory = json.createBuilderFactory(Collections.emptyMap()); def builder = factory.createObjectBuilder(); builder.add("componentId", status.getId()); builder.add("bytesRead", status.getBytesRead()); builder.add("bytesWritten", status.getBytesWritten()); builder.add("bytesReceived", status.getBytesReceived()); builder.add("bytesSent", status.getBytesSent()); builder.add("bytesTransferred", status.getBytesTransferred()); builder.add("flowFilesReceived", status.getFlowFilesReceived()); builder.add("flowFilesSent", status.getFlowFilesSent()); builder.add("flowFilesTransferred", status.getFlowFilesTransferred()); builder.add("inputContentSize", status.getInputContentSize()); builder.add("inputCount", status.getInputCount()); builder.add("outputContentSize", status.getOutputContentSize()); builder.add("outputCount", status.getOutputCount()); builder.add("queuedContentSize", status.getQueuedContentSize()); builder.add("activeThreadCount", status.getActiveThreadCount()); builder.add("queuedCount", status.getQueuedCount()); def requestEntity = new org.apache.http.entity.StringEntity(builder.build().toString(), contentType.APPLICATION_JSON); def httpclient = httpClients.createDefault(); def postMethod = new org.apache.http.client.methods.HttpPost("http://localhost:9999/rootStatus"); postMethod.setEntity(requestEntity); httpclient.execute(postMethod); httpclient.close();
Note – this should be improved to, for instance, properly handle potential exceptions.
To get this code working, I also have to specify the required dependencies (JSON, Apache HTTP, etc). For that, in the module directory property of the reporting task, I gave the following paths (because I’m lazy, I am pointing to much dependencies than required):
In my example I’m sending my JSON payload with a POST HTTP request to localhost on port 9999 with the path rootStatus. To receive the request, I started a ListenHttp processor with the following configuration:
Once my reporting task is started, I start receiving the information as flow files:
This scripted reporting task allows you to quickly develop proof of concept to send information to your internal systems using the interfaces you want. However, according to your needs, it might be more interesting to develop your own reporting task in Java and to build the corresponding NAR. It will give you more flexibility/options (you’ll be able to implement more interfaces) and better performances.
As usual feel free to ask questions and comment this post.