Debugging Hadoop WebHDFS API

Last week, I found myself unable to use the WebHDFS REST API through an ETL tool. The only error message I got was:

HTTP 400 Bad Request

By looking at the following documentation, I understood that there was only two options:

  • IllegalArgumentException
  • UnsupportedOperationException

Not really helping so I decided to make the API calls myself with curl requests… and here it was with a simple query to list files at the root folder of HDFS as user “123”:

$ curl -i -X PUT -T test.txt "http://mynode:50075/webhdfs/v1/?op=LISTSTATUS&user.name=123"
HTTP/1.1 400 Bad Request
Content-Type: application/json; charset=utf-8
Content-Length: 209
Connection: close

{"RemoteException":{"exception":"IllegalArgumentException","javaClassName":"java.lang.IllegalArgumentException","message":"Invalid value: \"123\" does not belong to the domain ^[A-Za-z_][A-Za-z0-9._-]*[$]?$"}}

Conclusion is: the user name used with the query is checked against a regular expression and, if not validated, the above exception is returned. The default regular expression being:

^[A-Za-z_][A-Za-z0-9._-]*[$]?$

I was unable to use WebHDFS because my username was starting by numbers… I understand that this is kind of an edge case: it is rare to have such format in user names but still… I couldn’t believe to be blocked because of this.

After a quick search, I found HDFS-4983 that exposed a property in the HDFS configuration file (from Apache Hadoop 2.3.0) to change the default regular expression. Great! I changed the property (dfs.webhdfs.user.provider.user.pattern), restarted my HDFS service and tested my curl request. The request was successful so I restarted my ETL workflow… and… got the same error! Kind of unexpected…

Back to the basics: making curl requests… My ETL workflow was just trying to load a file into HDFS… So let’s do that manually:

$ curl -i -X PUT "http://mynode:50070/webhdfs/v1/tmp/test.txt?op=CREATE&user.name=123"
HTTP/1.1 307 TEMPORARY_REDIRECT
Cache-Control: no-cache
Expires: Sat, 04 Feb 2017 10:19:38 GMT
Date: Sat, 04 Feb 2017 10:19:38 GMT
Pragma: no-cache
Expires: Sat, 04 Feb 2017 10:19:38 GMT
Date: Sat, 04 Feb 2017 10:19:38 GMT
Pragma: no-cache
X-FRAME-OPTIONS: SAMEORIGIN
Set-Cookie: hadoop.auth="u=123&p=123&t=simple&e=1486239578624&s=UrzCjP0SPpPKDJnSYB5BsKuQVKc="; Path=/; HttpOnly
Location: http://mynode:50075/webhdfs/v1/tmp/test.txt?op=CREATE&user.name=123&namenoderpcaddress=mynode:8020&createflag=&createparent=true&overwrite=false
Content-Type: application/octet-stream
Content-Length: 0

$ curl -i -X PUT -T test.txt "http://mynode:50075/webhdfs/v1/tmp/test.txt?op=CREATE&user.name=123&namenoderpcaddress=mynode:8020&createflag=&createparent=true&overwrite=false"
HTTP/1.1 400 Bad Request
Content-Type: application/json; charset=utf-8
Content-Length: 209
Connection: close

{"RemoteException":{"exception":"IllegalArgumentException","javaClassName":"java.lang.IllegalArgumentException","message":"Invalid value: \"123\" does not belong to the domain ^[A-Za-z_][A-Za-z0-9._-]*[$]?$"}}

As explained here, the first step is to make a request against the Name Node to get the address of a Data Node where the file data is to be written. Then make the second request directly to the Data Node to actually send the data (in this case it’s my test.txt file). And… here is the error again!

So it seems that HDFS-4983 is only fixing read access against the Name Node web interface but not when calling the web interface running with a Data Node… I checked out the code (be careful to check out the code corresponding to the exact version you are running otherwise attaching a debugger won’t be helpful, in my case I got the sources from Hortonworks repositories), and imported it in my Eclipse IDE. A quick search in the code to find occurrences of the regular expression lead me to UserParam class where I found a static import:

import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_WEBHDFS_USER_PATTERN_DEFAULT;

And:

private static Domain domain = new Domain(NAME, Pattern.compile(DFS_WEBHDFS_USER_PATTERN_DEFAULT));

Meaning that if this domain attribute is not overridden somewhere by a call, then the default regular expression will be used.  It really looks like what we are experiencing!

Just to confirm my assumption, I modified the bootstrap of the Data Node JVM to attach a debugger by adding the following parameters when the JVM is launched:

-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=9999

This way, in Eclipse, I can launch a remote debugger pointing to my Data Node on the port 9999 (ensure this port is opened and available) and define all the breakpoints I want. In this case I set a break point in the channelRead0 method of the WebHdfsHandler class (which is the entry point of the HTTP listener running in the Data Node). This way I confirmed that when checking Users Group Information:

ugi = ugiProvider.ugi();

The user name is checked using the default regular expression. By looking at the patch in HDFS-4983, it was easy to figure out what needed to be added in the code to have the property correctly set in the Data Node web handler. And that’s how I created HDFS-11391 and the associated pull request.

One line of code to fix this issue. Sometimes it’s not that easy. Anyway, it gave me the opportunity to contribute to the Apache Hadoop project… Sweet.

Screen Shot 2016-08-13 at 8.18.54 PM

HAProxy load balancing in front of Apache NiFi

There is a lot of situations where load balancing is necessary even more so when you are in a clustered architecture. In the context of NiFi you may want to consider two situations:

  • You have a NiFi cluster and you don’t want to give users the IP address of your NiFi nodes to access the UI (remember: every node of the cluster can be used for the UI or to make REST API calls) and besides you want to load balance users on every node of the cluster. In addition, in case you scale up/down your cluster, you don’t want to inform all the users of such a change.
  • You have Listen[X] processors (HTTP, TCP, UDP, Syslog, etc) in your workflow that are not running on the primary node only and you want to have all the nodes receiving the data (to increase performances and have full scalability). Again you don’t want to configure/inform the data senders with all the IPs of your NiFi nodes, and in case of a cluster change you don’t want to make some changes on client side.

In such situations you need a load balancer that will stand in front of your NiFi cluster and will provide a VIP (Virtual IP). For this purpose, you have hardware options, software options and also cloud options. Keep in mind that, if you want a production setup, you’ll need to have your load balancer installed in a high availability fashion otherwise it’ll become a single point of failure.

In this article, I’ll use HAProxy which is the most widely used open source software based load balancing solution. And I’ll launch my HAProxy instance in a Docker container (I’ll also use Portainer as a UI helping me to manage my Docker environment).

Some useful links that can help you with the different tools I’m going to use:

I assume I already have my secured NiFi 3-nodes cluster up and running. And also, I’ve already setup my Docker and Portainer environment: I’m ready to pull HAProxy Docker image and to create my container.

Let’s start with my HAProxy instance. First of all, in Portainer, I just need to pull the image I want (in this case I want the latest version and I just need to enter haproxy):

Screen Shot 2017-02-08 at 12.23.09 PM.png

I now have the image available:

screen-shot-2017-02-08-at-12-24-14-pm

Let’s build a container. In order to ease the modification of the configuration file of HAProxy, I’ll define a bind mount to bind the configuration file inside the container to a file I have on my computer. I also need to define all the ports I want to expose in my container. Since my container will run on one of the nodes of my cluster I don’t want to use the same ports (but in practice, your HAProxy would probably be on its own host).

  • Open my container on 9443, and I’ll configure HAProxy to listen on 9443 and to redirect requests made on 9443 to my NiFi nodes on port 8443 (port of my NiFi UI)
  • Open my container on 9999, and I’ll configure HAProxy to listen on 9999 and to redirect requests made on 9999 to my NiFi nodes on port 8888 (port that I’ll use for my ListenHTTP processors)
  • Open my container on 1936 and map on port 1936 inside my container (that’s the port used by the HAProxy management UI)

In Portainer, I add a container and define the following:

Screen Shot 2017-02-08 at 2.42.02 PM.png

When I create my container, it will immediately stop because I didn’t configure my configuration file so far and the container won’t start if the configuration file is invalid or does not exist. You can check the logs of your container in Portainer in case of issue.

In our case we’ll have HTTP access to send data to our ListenHTTP processors and HTTPS to access the UI. In case of HTTPS, I don’t want to have my load balancer taking care of certificates and I just want to have my requests going through (pure TCP load balancing)… this will raise an issue to users accessing the UI because the certificate presented by the NiFi node won’t match the address the client is requesting (load balancer address), this could be solved by adding a SAN (Subject Alternative Name) in the certificate of the nodes but this will be discussed in another article (in the mean time you can have a look at NIFI-3331). In this case (and just because this is a demo!), I’ll accept the SSL exceptions in my browser.

OK so let’s see my HAProxy configuration file:

global

defaults
    log     global
    mode    http
    option  httplog
    option  dontlognull
    timeout connect 5000
    timeout client  50000
    timeout server  50000

frontend nifi-ui
    bind *:9443
    mode tcp
    default_backend nodes-ui

backend nodes-ui
    mode tcp
    balance roundrobin
    stick-table type ip size 200k expire 30m
    stick on src
    option httpchk HEAD / HTTP/1.1\r\nHost:localhost
    server nifi01 node-1:8443 check check-ssl verify none
    server nifi02 node-2:8443 check check-ssl verify none
    server nifi03 node-3:8443 check check-ssl verify none

frontend nifi-listen-http
    bind *:9999
    mode http
    default_backend nodes-http

backend nodes-http
    mode http
    balance roundrobin
    option forwardfor
    http-request set-header X-Forwarded-Port %[dst_port]
    option httpchk HEAD / HTTP/1.1\r\nHost:localhost
    server nifi01 node-1:8888 check
    server nifi02 node-2:8888 check
    server nifi03 node-3:8888 check

listen stats
    bind *:1936
    mode http
    stats enable
    stats uri /
    stats hide-version
    stats auth admin:password

Let’s see the different parts:

  • global – I put nothing in here
  • defaults – Just some timeouts and log configuration, nothing special
  • frontend nifi-ui – here I define a front end called “nifi-ui”. This is where I define on what port HAProxy should listen (in this case on port 9443, for TCP mode) and where I should redirect the requests (in this case to my back end called nodes-ui).
  • backend nodes-ui – here I define my back end where will be redirected my requests received by my front end. I define TCP mode, round robin load balancing, stickiness (to ensure that a connected user, based on its IP, will remain on the same node over multiple requests), and the nodes available. I also define the health check operation performed by HAProxy to confirm if nodes are up or down. In this case it’s a HEAD HTTP request and I don’t check the SSL certificates.
  • frontend nifi-listen-http – here I define the front end that will be used to send data to my ListenHTTP processors on port 9999.
  • backend nodes-ui – here I define the back end with my servers, the HTTP mode, the health check, and also the addition of two HTTP headers (x-forwarded-port and x-forwarded-for) to keep track of IP and port of my client (otherwise I’d only know about the IP and port of my load balancer when receiving data in NiFi).
  • listen stats – here are some parameters about the HAProxy management UI, like the port, the login and password, etc.

I restart my container to take into account the configuration, and I can have a look to the management UI:

screen-shot-2017-02-08-at-4-37-45-pm

All is green meaning that our health checks are OK.

Assuming my load balancer has the following host name “my-nifi-vip”, I can now access the UI through https://my-nifi-vip:9443/nifi :

screen-shot-2017-02-08-at-4-41-53-pm

On my UI, I can configure a ListenHTTP processor as below:

Screen Shot 2017-02-08 at 4.43.14 PM.png

screen-shot-2017-02-08-at-4-45-05-pm

And send data to my cluster using the virtual IP provided by my load balancer:

while true; do curl -X POST http://my-nifi-vip:9999/test; done;

In the HAProxy management UI, I can confirm that my requests are correctly load balanced on all the nodes of my cluster:

screen-shot-2017-02-08-at-5-17-01-pm

It is now really easy to add or remove new nodes in your NiFi cluster without impacting the clients sending data into NiFi since they only need to know about the virtual IP exposed by the load balancer. You just need to update the configuration file when you are adding nodes, and restart your HAProxy container.

Keep in mind that even if the data is correctly load balanced over the nodes, all the requests are going through a single point, the load balancer. Consequently, on a performance standpoint, your load balancer may become a bottleneck in case you need to handle a very large number of connections per second. However load balancers are designed to be as efficient as possible and you should be OK in most cases.

As always questions/comments are welcomed.

Screen Shot 2016-08-13 at 8.18.54 PM

Using counters in Apache NiFi

You may not know it but you have the availability to define and play with counters in NiFi. If policies are correctly configured (if your NiFi is secured), you should be able to access the existing counters using the menu:

Screen Shot 2017-02-07 at 5.33.59 PM.png

Counters are just values that you can increase or decrease of a given delta. This is useful if you want to monitor particular values along your workflow. At the moment, unless you use a processor that explicitly uses counters or provides a way to define counters, there is nothing available out of the box.

The best way to define and update counters is to use ExecuteScript processor with the following piece of Groovy code:

def flowFile = session.get()
if(!flowFile) return
session.adjustCounter("my-counter", 1, true)
session.transfer(flowFile, REL_SUCCESS)

With this example, the ExecuteScript processor will just transmit the flow file without any modification to the success relationship but will also increment the counter “my-counter” of 1. If this counter does not exist it will be initialized with the delta value given as argument.

Here is the documentation of this method:

    /**
     * Adjusts counter data for the given counter name and takes care of
     * registering the counter if not already present. The adjustment occurs
     * only if and when the ProcessSession is committed.
     *
     * @param name the name of the counter
     * @param delta the delta by which to modify the counter (+ or -)
     * @param immediate if true, the counter will be updated immediately,
     *            without regard to whether the ProcessSession is commit or rolled back;
     *            otherwise, the counter will be incremented only if and when the
     *            ProcessSession is committed.
     */
    void adjustCounter(String name, long delta, boolean immediate);

Let’s see an example: I want to confirm the correct behavior of the GetHDFS processor when I have multiple instances of this processor looking into the same directory but getting different flow files based on a regular expression.

Here is the first part of my flow:

screen-shot-2017-02-07-at-6-20-52-pm

Basically, I am generating flow files every 1ms with GenerateFlowFile. The generated flow files will be named after the generation date timestamp (without any extension). I am sending the files into HDFS and then I’m using a RouteOnAttribute where I check the filename according to a regular expression to split files according to even and uneven names. This way I can increment counters tracking the number of files I sent to HDFS with even names and with uneven names.

Here is the second part of my flow:

Screen Shot 2017-02-07 at 6.27.28 PM.png

I have two instances of GetHDFS processor configured to look into the same input directory but one with a regular expression to look for files with an even name, and one to look for files with an uneven name, this way there is no concurrent access. Besides, the processor is configured to delete the file on HDFS once the file is retrieved in NiFi. Then I update two different counters to track the number of files with an even name that I retrieved from HDFS, and one for files with an uneven name.

If everything is working correctly, I should be able to let run my workflow a bit, then stop the generation of flow files, wait for all the flow files to be processed and confirm that:

  • even-producer counter is equal to even-consumer counter
  • unenven-producer counter is equal to uneven-consumer counter

Let’s have a look into our counters table:

screen-shot-2017-02-07-at-6-47-02-pm

It looks like we are all good 😉

As a remark, if you have multiple processors updating the same counter, then you will have the global value of the counter but also the value at each processor level. For example, if I have:

screen-shot-2017-02-07-at-6-53-09-pm

With both ExecuteScript incrementing the same “test” counter, then, I’ll have:

screen-shot-2017-02-07-at-6-55-48-pm

Also, as a last remark, you can notice that it’s possible to reset a counter to 0 from the counters table with the button in the last column (assuming you have write access to the counters based on the defined policies). It can be useful when doing some tests.

As always, questions/comments are welcomed!