hi, are you ingesting streaming data? 2016-12-02 10:56 GMT+01:00 Roberto Coluccio <[email protected]>:
> Hello again guys, > > sorry to bother, I'm kinda new to the Flume world and I'm running > experiments to evaluate pros and cons about diverse topologies and settings. > > I'm currently experiencing the following issue: > > Agent topology made of: JMS source --> Kafka Channel --> HDFS sink > Source and Sink batch size, and Channel transactionCapacity = 1000 > Channel capacity = 10000 > Channel kafka.consumer.auto.offset.reset = latest > Channel migrateZookeeperOffsets = false > > I'm trying to confirm the pair (channel, sink) is fault tolerant, and such > an agent is capable of being re-started after being terminated, resuming > its draining from the channel from the point (offset) where it was left. > > I have a CDH5.8 cluster and launch my agent with a script that calls > flume-ng, passing it a custom configuration I make available on a certain > file. > > Given x messages previously pushed on the input JMS queue, I: > > > 1. start the agent through my script > 2. verify it opens up a new file on hdfs and starts consuming events > (it writes 1 event per line) > 3. stop the agent (CTRL+C on the open shell session) > 4. re-start the agent through my script > 5. wait until it completes its draining > 6. count the lines written across all the generated files > > What I experienced is that: > > 1. when the script/process termination at step 3 is graceful (i.e. no > exceptions are raised), I successfully verify that only x messages were > written on HDFS; > 2. when the script/process termination at step 3 is followed by the > exception: > > 16/12/02 09:19:46 WARN jms.JMSSource: JMSException consuming events > javax.jms.JMSException: InterruptedException has occurred while waiting > for server response > at com.tibco.tibjms.Tibjmsx.buildException(Tibjmsx.java:502) > at com.tibco.tibjms.TibjmsxLink.sendRequest(TibjmsxLink.java:364) > at com.tibco.tibjms.TibjmsxLink.sendRequestMsg(TibjmsxLink.java:293) > at com.tibco.tibjms.TibjmsxSessionImp._processNoWait( > TibjmsxSessionImp.java:3548) > at com.tibco.tibjms.TibjmsxSessionImp._receive( > TibjmsxSessionImp.java:1947) > at com.tibco.tibjms.TibjmsMessageConsumer._ > receive(TibjmsMessageConsumer.java:240) > at com.tibco.tibjms.TibjmsMessageConsumer.receiveNoWait( > TibjmsMessageConsumer.java:492) > at org.apache.flume.source.jms.JMSMessageConsumer.take( > JMSMessageConsumer.java:127) > at org.apache.flume.source.jms.JMSSource.doProcess(JMSSource.java:261) > at org.apache.flume.source.AbstractPollableSource.process( > AbstractPollableSource.java:58) > at org.apache.flume.source.PollableSourceRunner$PollingRunner.run( > PollableSourceRunner.java:137) > at java.lang.Thread.run(Thread.java:745) > > then I successfully verify that only x messages were written on HDFS; > > > 1. when the script/process termination at step 3 is followed by the > exception: > > ^C16/12/02 09:10:28 INFO lifecycle.LifecycleSupervisor: Stopping lifecycle > supervisor 11 > 16/12/02 09:10:28 ERROR hdfs.HDFSEventSink: process failed > java.lang.InterruptedException: Timed out before HDFS call was made. Your > hdfs.callTimeout might be set too low or HDFS calls are taking too long. > at org.apache.flume.sink.hdfs.BucketWriter. > checkAndThrowInterruptedException(BucketWriter.java:660) > at org.apache.flume.sink.hdfs.BucketWriter.append( > BucketWriter.java:483) > at org.apache.flume.sink.hdfs.HDFSEventSink.process( > HDFSEventSink.java:418) > at org.apache.flume.sink.DefaultSinkProcessor.process( > DefaultSinkProcessor.java:68) > at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147) > at java.lang.Thread.run(Thread.java:745) > 16/12/02 09:10:28 ERROR flume.SinkRunner: Unable to deliver event. > Exception follows. > org.apache.flume.EventDeliveryException: java.lang.InterruptedException: > Timed out before HDFS call was made. Your hdfs.callTimeout might be set too > low or HDFS calls are taking too long. > at org.apache.flume.sink.hdfs.HDFSEventSink.process( > HDFSEventSink.java:463) > at org.apache.flume.sink.DefaultSinkProcessor.process( > DefaultSinkProcessor.java:68) > at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.InterruptedException: Timed out before HDFS call was > made. Your hdfs.callTimeout might be set too low or HDFS calls are taking > too long. > at org.apache.flume.sink.hdfs.BucketWriter. > checkAndThrowInterruptedException(BucketWriter.java:660) > at org.apache.flume.sink.hdfs.BucketWriter.append( > BucketWriter.java:483) > at org.apache.flume.sink.hdfs.HDFSEventSink.process( > HDFSEventSink.java:418) > ... 3 more > 16/12/02 09:10:33 INFO hdfs.HDFSEventSink: Closing ... > > .... logs continue ... then: > > 16/12/02 09:10:34 INFO instrumentation.MonitoredCounterGroup: Shutdown > Metric for type: CHANNEL, name: chl_jms_kafka. channel.rollback.count == > *<aNumberGreate**rTh**e**nZero>* > > then I get written on HDFS my initial x messages *PLUS* *<aNumberGreate* > *rTh**e* > > *nZero> !!! * > > This behaviour is really frustrating and I don't understand how to avoid > those duplicates. As a side note, the same experiment but with a brutal > agent termination at step 3 (related process's pid kill -9) does not > produce duplicates! > > I will appreciate any help on this (for me crucial) topic. > > Thank you, > > Roberto > >
