I am not sure why the stream is closed. But, I have opened https://issues.apache.org/jira/browse/METRON-1153, because we should verify the stream before attempting to write.
On September 3, 2017 at 21:28:16, Ali Nazemian ([email protected]) wrote: Hi all, We have run into an issue on Indexing topology on the HDFS bolt recently. We are using HDFS TDE for encryption at rest and it is working properly for 2-3 days. After that, we can see the following exception frequently on HDFS writer bolt and the throughput of this topology drops significantly. FYI, this error will disappear after restarting "indexing" topology. o.a.m.w.BulkWriterComponent [ERROR] Failing 51 tuples java.io.IOException: Stream closed at org.apache.hadoop.crypto.CryptoOutputStream.checkStream(CryptoOutputStream.java:250) ~[stormjar.jar:?] at org.apache.hadoop.crypto.CryptoOutputStream.write(CryptoOutputStream.java:133) ~[stormjar.jar:?] at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:58) ~[stormjar.jar:?] at java.io.DataOutputStream.write(DataOutputStream.java:107) ~[?:1.8.0_131] at java.io.FilterOutputStream.write(FilterOutputStream.java:97) ~[?:1.8.0_131] at org.apache.metron.writer.hdfs.SourceHandler.handle(SourceHandler.java:74) ~[stormjar.jar:?] at org.apache.metron.writer.hdfs.HdfsWriter.write(HdfsWriter.java:113) ~[stormjar.jar:?] at org.apache.metron.writer.BulkWriterComponent.flush(BulkWriterComponent.java:239) [stormjar.jar:?] at org.apache.metron.writer.BulkWriterComponent.flushTimeouts(BulkWriterComponent.java:281) [stormjar.jar:?] at org.apache.metron.writer.bolt.BulkMessageWriterBolt.execute(BulkMessageWriterBolt.java:211) [stormjar.jar:?] at org.apache.storm.daemon.executor$fn__6573$tuple_action_fn__6575.invoke(executor.clj:734) [storm-core-1.0.1.2.5.6.0-40.jar:1.0.1.2.5.6.0-40] at org.apache.storm.daemon.executor$mk_task_receiver$fn__6494.invoke(executor.clj:469) [storm-core-1.0.1.2.5.6.0-40.jar:1.0.1.2.5.6.0-40] at org.apache.storm.disruptor$clojure_handler$reify__6007.onEvent(disruptor.clj:40) [storm-core-1.0.1.2.5.6.0-40.jar:1.0.1.2.5.6.0-40] at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:451) [storm-core-1.0.1.2.5.6.0-40.jar:1.0.1.2.5.6.0-40] at org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:430) [storm-core-1.0.1.2.5.6.0-40.jar:1.0.1.2.5.6.0-40] at org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:73) [storm-core-1.0.1.2.5.6.0-40.jar:1.0.1.2.5.6.0-40] at org.apache.storm.daemon.executor$fn__6573$fn__6586$fn__6639.invoke(executor.clj:853) [storm-core-1.0.1.2.5.6.0-40.jar:1.0.1.2.5.6.0-40] at org.apache.storm.util$async_loop$fn__554.invoke(util.clj:484) [storm-core-1.0.1.2.5.6.0-40.jar:1.0.1.2.5.6.0-40] at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_131] Cheers, Ali
