Hi,

we are using the storm-hdfs bolt (0.9.4) to write data from Kafka to Hadoop (Hadoop 2.5.0-cdh5.2.0).

This works fine for us but we discovered some unexpected behavior:

Our bolt uses the TimedRotationPolicy to rotate finished files from one location within HDFS to another. Unfortunately, there are some files that remain within the "writing" location and do not get rotated, as the following list shows (I performed this command today and our rotation policy is set to 180 seconds):


/hadoop fs -ls /tmp/storm-events/valid/collecting | grep "\-25"/
-rw-r--r-- 3 storm storm 20512704 2015-04-25 12:41 /tmp/storm-events/valid/collecting/events_hdfs-bolt-valid-16-2-1429965520003.txt -rw-r--r-- 3 storm storm 5559950 2015-04-25 12:32 /tmp/storm-events/valid/collecting/events_hdfs-bolt-valid-16-270-1429965058462.txt -rw-r--r-- 3 storm storm 4174336 2015-04-25 00:00 /tmp/storm-events/valid/collecting/events_hdfs-bolt-valid-16-769-1429916336332.txt -rw-r--r-- 3 storm storm 125230972 2015-04-25 12:43 /tmp/storm-events/valid/collecting/events_hdfs-bolt-valid-19-0-1429965627846.txt -rw-r--r-- 3 storm storm 115531743 2015-04-25 12:45 /tmp/storm-events/valid/collecting/events_hdfs-bolt-valid-19-0-1429965816167.txt -rw-r--r-- 3 storm storm 106212613 2015-04-25 12:48 /tmp/storm-events/valid/collecting/events_hdfs-bolt-valid-19-0-1429965953513.txt -rw-r--r-- 3 storm storm 25599779 2015-04-25 12:39 /tmp/storm-events/valid/collecting/events_hdfs-bolt-valid-19-1042-1429965476558.txt -rw-r--r-- 3 storm storm 20513134 2015-04-25 12:41 /tmp/storm-events/valid/collecting/events_hdfs-bolt-valid-21-2-1429965520003.txt -rw-r--r-- 3 storm storm 5556055 2015-04-25 12:32 /tmp/storm-events/valid/collecting/events_hdfs-bolt-valid-21-270-1429965058462.txt -rw-r--r-- 3 storm storm 4171264 2015-04-25 00:00 /tmp/storm-events/valid/collecting/events_hdfs-bolt-valid-21-769-1429916336335.txt


If you check those files with "hadoop fsck -openforwrite", there are no open filehandles.

Now, if we have a look at the nimbus ui, there are a lot of failed tuples (but only on specific workers):



The worker logs gave an explanation of those failures:


/tail -f worker-6704.log/
2015-04-29T11:31:58.337+0000 o.a.s.h.b.HdfsBolt [WARN] write/sync failed.
org.apache.hadoop.ipc.RemoteException: java.lang.ArrayIndexOutOfBoundsException

at org.apache.hadoop.ipc.Client.call(Client.java:1347) ~[stormjar.jar:0.1.0] at org.apache.hadoop.ipc.Client.call(Client.java:1300) ~[stormjar.jar:0.1.0] at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206) ~[stormjar.jar:0.1.0]
    at com.sun.proxy.$Proxy8.updatePipeline(Unknown Source) ~[na:na]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_31] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_31] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_31]
    at java.lang.reflect.Method.invoke(Method.java:483) ~[na:1.8.0_31]
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186) ~[stormjar.jar:0.1.0] at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) ~[stormjar.jar:0.1.0]
    at com.sun.proxy.$Proxy8.updatePipeline(Unknown Source) ~[na:na]
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.updatePipeline(ClientNamenodeProtocolTranslatorPB.java:791) ~[stormjar.jar:0.1.0] at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.setupPipelineForAppendOrRecovery(DFSOutputStream.java:1047) ~[stormjar.jar:0.1.0] at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.processDatanodeError(DFSOutputStream.java:823) ~[stormjar.jar:0.1.0] at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:475) ~[stormjar.jar:0.1.0]


So it seems, that the hdfs-bolt has still an instance of FSDataOutputStream which points to one of those files but as soon as it tries to write to or rotate it, this exception occurs. I also had a look at the hdfs-bolt implementation to find the exact handling of such problems (https://github.com/ptgoetz/storm-hdfs):


src/main/java/org/apache/storm/hdfs/bolt/HdfsBolt.java:89-118

    @Override
    public void execute(Tuple tuple) {
        try {
            [... write and/or rotate ...]
        } catch (IOException e) {
            LOG.warn("write/sync failed.", e);
            this.collector.fail(tuple);
        }
    }


This handling will just fail the tuple but keep the corrupt FSDataOutputStream instance. Therefore, those hdfs-bolt instances will always fail for every tuple. Of course, this does not result into data loss because the tuple gets reprocessed and might be handled by an working instance, but still this causes some trouble :-).

Since the exception is not thrown up, we can not handle this issue in our implementation. It might be a solution to adjust the exception handling within the hdfs-bolt to renew the FSDataOutputStream instance in case of an IOException - and still fail the tuple, of course. This might be useful for other cases and users as well.

The question now is, wheter some of you discovered a similar problem and whether our solution makes sense?

Thanks a lot and best wishes
Volker

Reply via email to