Thanks, but I understood why this is happening. On Mon, Sep 24, 2012 at 6:01 PM, Harish Mandala <[email protected]>wrote:
> Hello, > > > I’m having some trouble with the HDFS Event Sink. I’m using the latest > version of flume NG, checked out today. > > > I am using curloader to hit “MycustomSource”, which essentially takes in > HTTP messages, and splits the content into 2 “kinds” of flume events > (differentiated by header key-value). The first kind is sent to hdfs-sink1, > and the second kind to hdfs-sink2 by a multiplexing selector as outlined in > the configuration below. There’s also an hdfs-sink3 which can be ignored at > present. > > I can’t really understand what’s going on. It seems related to some of the > race condition issues outlined here: > > https://issues.apache.org/jira/browse/FLUME-1219 > > > Please let me know if you need more information. > > > The following is my conf file. It is followed by flume.log. > > > #### flume.conf #### > > agent1.channels = ch1 ch2 ch3 > > agent1.sources = mycustom-source1 > > agent1.sinks = hdfs-sink1 hdfs-sink2 hdfs-sink3 > > # Define a memory channel called ch1 on agent1 > > agent1.channels.ch1.type = memory > > agent1.channels.ch1.capacity = 200000 > > agent1.channels.ch1.transactionCapacity = 20000 > > agent1.channels.ch2.type = memory > > agent1.channels.ch2.capacity = 1000000 > > agent1.channels.ch2.transactionCapacity = 100000 > > agent1.channels.ch3.type = memory > > agent1.channels.ch3.capacity = 10000 > > agent1.channels.ch3.transactionCapacity = 5000 > > > > #agent1.channels.ch2.type = memory > > #agent1.channels.ch3.type = memory > > > > # Define an Mycustom custom source called mycustom-source1 on agent1 and > tell it > > # to bind to 0.0.0.0:41414. Connect it to channel ch1. > > agent1.sources.mycustom-source1.channels = ch1 ch2 ch3 > > agent1.sources.mycustom-source1.type = > org.apache.flume.source.MycustomSource > > agent1.sources.mycustom-source1.bind = 127.0.0.1 > > agent1.sources.mycustom-source1.port = 1234 > > agent1.sources.mycustom-source1.serialization_method = json > > #agent1.sources.mycustom-source1.schema_filepath = > /home/ubuntu/Software/flume/trunk/conf/AvroEventSchema.avpr > > > > # Define an HDFS sink > > agent1.sinks.hdfs-sink1.channel = ch1 > > agent1.sinks.hdfs-sink1.type = hdfs > > agent1.sinks.hdfs-sink1.hdfs.path = hdfs://localhost:54310/user/flumeDump1 > > agent1.sinks.hdfs-sink1.hdfs.filePrefix = events > > agent1.sinks.hdfs-sink1.hdfs.batchSize = 20000 > > agent1.sinks.hdfs-sink1.hdfs.fileType = DataStream > > agent1.sinks.hdfs-sink1.hdfs.writeFormat = Text > > agent1.sinks.hdfs-sink1.hdfs.maxOpenFiles = 10000 > > agent1.sinks.hdfs-sink1.hdfs.rollSize = 0 > > agent1.sinks.hdfs-sink1.hdfs.rollInterval = 0 > > agent1.sinks.hdfs-sink1.hdfs.rollCount = 20000 > > agent1.sinks.hdfs-sink1.hdfs.hdfs.threadsPoolSize = 20 > > > > agent1.sinks.hdfs-sink2.channel = ch2 > > agent1.sinks.hdfs-sink2.type = hdfs > > agent1.sinks.hdfs-sink2.hdfs.path = hdfs://localhost:54310/user/flumeDump2 > > agent1.sinks.hdfs-sink2.hdfs.filePrefix = events > > agent1.sinks.hdfs-sink2.hdfs.batchSize = 100000 > > agent1.sinks.hdfs-sink2.hdfs.fileType = DataStream > > agent1.sinks.hdfs-sink2.hdfs.writeFormat = Text > > agent1.sinks.hdfs-sink2.hdfs.maxOpenFiles = 10000 > > agent1.sinks.hdfs-sink2.hdfs.rollSize = 0 > > agent1.sinks.hdfs-sink2.hdfs.rollInterval = 0 > > agent1.sinks.hdfs-sink2.hdfs.rollCount = 100000 > > agent1.sinks.hdfs-sink2.hdfs.hdfs.threadsPoolSize = 20 > > > > agent1.sinks.hdfs-sink3.channel = ch3 > > agent1.sinks.hdfs-sink3.type = hdfs > > agent1.sinks.hdfs-sink3.hdfs.path = hdfs://localhost:54310/user/flumeDump3 > > agent1.sinks.hdfs-sink3.hdfs.filePrefix = events > > agent1.sinks.hdfs-sink3.hdfs.batchSize = 1000 > > agent1.sinks.hdfs-sink3.hdfs.fileType = DataStream > > agent1.sinks.hdfs-sink3.hdfs.writeFormat = Text > > agent1.sinks.hdfs-sink3.hdfs.maxOpenFiles = 10000 > > agent1.sinks.hdfs-sink3.hdfs.rollSize = 0 > > agent1.sinks.hdfs-sink3.hdfs.rollInterval = 0 > > agent1.sinks.hdfs-sink3.hdfs.rollCount = 1000 > > agent1.sinks.hdfs-sink3.hdfs.hdfs.threadsPoolSize = 20 > > > > agent1.sources.mycustom-source1.selector.type = multiplexing > > agent1.sources.mycustom-source1.selector.header = Type > > agent1.sources.mycustom-source1.selector.mapping.type1 = ch1 > > agent1.sources.mycustom-source1.selector.mapping.type2 = ch2 > > agent1.sources.mycustom-source1.selector.mapping.type3 = ch3 > > agent1.sources.mycustom-source1.selector.default = ch1 > > > > #### end of conf file #### > > > > Here are the errors from flume.log. > > > 24 Sep 2012 21:32:13,569 WARN [SinkRunner-PollingRunner-DefaultSinkProcessor] > (org.apache.flume.sink.hdfs.HDFSEventSink.callWithTimeout:366) - > Unexpected Exception null > > java.lang.InterruptedException > > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1325) > > at > java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:257) > > at java.util.concurrent.FutureTask.get(FutureTask.java:119) > > at > org.apache.flume.sink.hdfs.HDFSEventSink.callWithTimeout(HDFSEventSink.java:339) > > at > org.apache.flume.sink.hdfs.HDFSEventSink.flush(HDFSEventSink.java:732) > > at > org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:430) > > at > org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68) > > at > org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147) > > at java.lang.Thread.run(Thread.java:679) > > 24 Sep 2012 21:32:13,572 ERROR > [SinkRunner-PollingRunner-DefaultSinkProcessor] > (org.apache.flume.sink.hdfs.HDFSEventSink.process:450) - process failed > > java.lang.InterruptedException > > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1325) > > at > java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:257) > > at java.util.concurrent.FutureTask.get(FutureTask.java:119) > > at > org.apache.flume.sink.hdfs.HDFSEventSink.callWithTimeout(HDFSEventSink.java:339) > > at > org.apache.flume.sink.hdfs.HDFSEventSink.flush(HDFSEventSink.java:732) > > at > org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:430) > > at > org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68) > > at > org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147) > > at java.lang.Thread.run(Thread.java:679) > > 24 Sep 2012 21:32:13,572 ERROR > [SinkRunner-PollingRunner-DefaultSinkProcessor] > (org.apache.flume.SinkRunner$PollingRunner.run:160) - Unable to deliver > event. Exception follows. > > org.apache.flume.EventDeliveryException: java.lang.InterruptedException > > at > org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:454) > > at > org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68) > > at > org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147) > > at java.lang.Thread.run(Thread.java:679) > > Caused by: java.lang.InterruptedException > > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1325) > > at > java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:257) > > at java.util.concurrent.FutureTask.get(FutureTask.java:119) > > at > org.apache.flume.sink.hdfs.HDFSEventSink.callWithTimeout(HDFSEventSink.java:339) > > at > org.apache.flume.sink.hdfs.HDFSEventSink.flush(HDFSEventSink.java:732) > > at > org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:430) > > ... 3 more > > 24 Sep 2012 21:32:16,350 WARN [SinkRunner-PollingRunner-DefaultSinkProcessor] > (org.apache.flume.sink.hdfs.HDFSEventSink.process:446) - HDFS IO error > > java.io.IOException: Filesystem closed > > at > org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:264) > > at > org.apache.hadoop.hdfs.DFSClient.access$1200(DFSClient.java:74) > > at > org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.sync(DFSClient.java:3664) > > at > org.apache.hadoop.fs.FSDataOutputStream.sync(FSDataOutputStream.java:97) > > at > org.apache.flume.sink.hdfs.HDFSDataStream.sync(HDFSDataStream.java:95) > > at > org.apache.flume.sink.hdfs.BucketWriter.doFlush(BucketWriter.java:298) > > at > org.apache.flume.sink.hdfs.BucketWriter.access$500(BucketWriter.java:50) > > at > org.apache.flume.sink.hdfs.BucketWriter$4.run(BucketWriter.java:287) > > at > org.apache.flume.sink.hdfs.BucketWriter$4.run(BucketWriter.java:284) > > at > org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:127) > > at > org.apache.flume.sink.hdfs.BucketWriter.flush(BucketWriter.java:284) > > at > org.apache.flume.sink.hdfs.HDFSEventSink$2.call(HDFSEventSink.java:735) > > at > org.apache.flume.sink.hdfs.HDFSEventSink$2.call(HDFSEventSink.java:732) > > at > java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334) > > at java.util.concurrent.FutureTask.run(FutureTask.java:166) > > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) > > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) > > at java.lang.Thread.run(Thread.java:679) > > 24 Sep 2012 21:32:18,573 INFO [node-shutdownHook] > (org.apache.flume.sink.hdfs.HDFSEventSink.stop:465) - Closing > hdfs://localhost:54310/user/flumeDump2//events > > 24 Sep 2012 21:32:18,575 WARN [hdfs-hdfs-sink2-call-runner-5] > (org.apache.flume.sink.hdfs.BucketWriter.doClose:259) - failed to > close() HDFSWriter for file > (hdfs://localhost:54310/user/flumeDump2//events.1348522332892.tmp). > Exception follows. > > java.io.IOException: Filesystem closed > > at > org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:264) > > at > org.apache.hadoop.hdfs.DFSClient.access$1200(DFSClient.java:74) > > at > org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.sync(DFSClient.java:3664) > > at > org.apache.hadoop.fs.FSDataOutputStream.sync(FSDataOutputStream.java:97) > > at > org.apache.flume.sink.hdfs.HDFSDataStream.close(HDFSDataStream.java:103) > > at > org.apache.flume.sink.hdfs.BucketWriter.doClose(BucketWriter.java:256) > > at > org.apache.flume.sink.hdfs.BucketWriter.access$400(BucketWriter.java:50) > > at > org.apache.flume.sink.hdfs.BucketWriter$3.run(BucketWriter.java:242) > > at > org.apache.flume.sink.hdfs.BucketWriter$3.run(BucketWriter.java:239) > > at > org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:127) > > at > org.apache.flume.sink.hdfs.BucketWriter.close(BucketWriter.java:239) > > at > org.apache.flume.sink.hdfs.HDFSEventSink$3.call(HDFSEventSink.java:750) > > at > org.apache.flume.sink.hdfs.HDFSEventSink$3.call(HDFSEventSink.java:747) > > at > java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334) > > at java.util.concurrent.FutureTask.run(FutureTask.java:166) > > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) > > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) > > at java.lang.Thread.run(Thread.java:679) > > 24 Sep 2012 21:32:18,576 WARN [node-shutdownHook] > (org.apache.flume.sink.hdfs.HDFSEventSink.stop:470) - Exception while > closing hdfs://localhost:54310/user/flumeDump2//events. Exception follows. > > java.io.IOException: Filesystem closed > > at > org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:264) > > at > org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:873) > > at > org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:513) > > at > org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:768) > > at > org.apache.flume.sink.hdfs.BucketWriter.renameBucket(BucketWriter.java:375) > > at > org.apache.flume.sink.hdfs.BucketWriter.doClose(BucketWriter.java:275) > > at > org.apache.flume.sink.hdfs.BucketWriter.access$400(BucketWriter.java:50) > > at > org.apache.flume.sink.hdfs.BucketWriter$3.run(BucketWriter.java:242) > > at > org.apache.flume.sink.hdfs.BucketWriter$3.run(BucketWriter.java:239) > > at > org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:127) > > at > org.apache.flume.sink.hdfs.BucketWriter.close(BucketWriter.java:239) > > at > org.apache.flume.sink.hdfs.HDFSEventSink$3.call(HDFSEventSink.java:750) > > at > org.apache.flume.sink.hdfs.HDFSEventSink$3.call(HDFSEventSink.java:747) > > at > java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334) > > at java.util.concurrent.FutureTask.run(FutureTask.java:166) > > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) > > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) > > at java.lang.Thread.run(Thread.java:679) > > 24 Sep 2012 21:32:18,589 INFO [node-shutdownHook] > (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:87) - > Component type: SINK, name: hdfs-sink2 stopped > > 24 Sep 2012 21:32:18,590 INFO [node-shutdownHook] > (org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.stopAllComponents:82) > - Stopping Sink hdfs-sink1 > > 24 Sep 2012 21:32:18,590 INFO [lifecycleSupervisor-1-4] > (org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run:215) - > Component has already been stopped SinkRunner: { > policy:org.apache.flume.sink.DefaultSinkProcessor@49dc423f counterGroup:{ > name:null counters:{runner.backoffs.consecutive=4, runner.backoffs=4, > runner.deliveryErrors=1} } } > > 24 Sep 2012 21:32:18,591 INFO [node-shutdownHook] > (org.apache.flume.lifecycle.LifecycleSupervisor.unsupervise:156) - > Stopping component: SinkRunner: { > policy:org.apache.flume.sink.DefaultSinkProcessor@1b815bfb counterGroup:{ > name:null counters:{runner.backoffs.consecutive=5, runner.backoffs=5} } } > > 24 Sep 2012 21:32:18,592 INFO [node-shutdownHook] > (org.apache.flume.sink.hdfs.HDFSEventSink.stop:465) - Closing > hdfs://localhost:54310/user/flumeDump1//events > > 24 Sep 2012 21:32:18,594 WARN [hdfs-hdfs-sink1-call-runner-3] > (org.apache.flume.sink.hdfs.BucketWriter.doClose:259) - failed to > close() HDFSWriter for file > (hdfs://localhost:54310/user/flumeDump1//events.1348522332892.tmp). > Exception follows. > > java.io.IOException: Filesystem closed > > at > org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:264) > > at > org.apache.hadoop.hdfs.DFSClient.access$1200(DFSClient.java:74) > > at > org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.sync(DFSClient.java:3664) > > at > org.apache.hadoop.fs.FSDataOutputStream.sync(FSDataOutputStream.java:97) > > at > org.apache.flume.sink.hdfs.HDFSDataStream.close(HDFSDataStream.java:103) > > at > org.apache.flume.sink.hdfs.BucketWriter.doClose(BucketWriter.java:256) > > at > org.apache.flume.sink.hdfs.BucketWriter.access$400(BucketWriter.java:50) > > at > org.apache.flume.sink.hdfs.BucketWriter$3.run(BucketWriter.java:242) > > at > org.apache.flume.sink.hdfs.BucketWriter$3.run(BucketWriter.java:239) > > at > org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:127) > > at > org.apache.flume.sink.hdfs.BucketWriter.close(BucketWriter.java:239) > > at > org.apache.flume.sink.hdfs.HDFSEventSink$3.call(HDFSEventSink.java:750) > > at > org.apache.flume.sink.hdfs.HDFSEventSink$3.call(HDFSEventSink.java:747) > > at > java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334) > > at java.util.concurrent.FutureTask.run(FutureTask.java:166) > > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) > > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) > > at java.lang.Thread.run(Thread.java:679) > > 24 Sep 2012 21:32:18,595 WARN [node-shutdownHook] > (org.apache.flume.sink.hdfs.HDFSEventSink.stop:470) - Exception while > closing hdfs://localhost:54310/user/flumeDump1//events. Exception follows. > > java.io.IOException: Filesystem closed > > at > org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:264) > > at > org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:873) > > at > org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:513) > > at > org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:768) > > at > org.apache.flume.sink.hdfs.BucketWriter.renameBucket(BucketWriter.java:375) > > at > org.apache.flume.sink.hdfs.BucketWriter.doClose(BucketWriter.java:275) > > at > org.apache.flume.sink.hdfs.BucketWriter.access$400(BucketWriter.java:50) > > at > org.apache.flume.sink.hdfs.BucketWriter$3.run(BucketWriter.java:242) > > at > org.apache.flume.sink.hdfs.BucketWriter$3.run(BucketWriter.java:239) > > at > org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:127) > > at > org.apache.flume.sink.hdfs.BucketWriter.close(BucketWriter.java:239) > > at > org.apache.flume.sink.hdfs.HDFSEventSink$3.call(HDFSEventSink.java:750) > > at > org.apache.flume.sink.hdfs.HDFSEventSink$3.call(HDFSEventSink.java:747) > > at > java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334) > > at java.util.concurrent.FutureTask.run(FutureTask.java:166) > > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) > > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) > > at java.lang.Thread.run(Thread.java:679) > > 24 Sep 2012 21:32:18,600 INFO [node-shutdownHook] > (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:87) - > Component type: SINK, name: hdfs-sink1 stopped > > 24 Sep 2012 21:32:18,600 INFO [node-shutdownHook] > (org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.stopAllComponents:92) > - Stopping Channel ch3 > > 24 Sep 2012 21:32:18,601 INFO [node-shutdownHook] > (org.apache.flume.lifecycle.LifecycleSupervisor.unsupervise:156) - > Stopping component: org.apache.flume.channel.MemoryChannel{name: ch3} > > 24 Sep 2012 21:32:18,601 INFO [node-shutdownHook] > (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:87) - > Component type: CHANNEL, name: ch3 stopped > > 24 Sep 2012 21:32:18,601 INFO [node-shutdownHook] > (org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.stopAllComponents:92) > - Stopping Channel ch2 > > 24 Sep 2012 21:32:18,601 INFO [node-shutdownHook] > (org.apache.flume.lifecycle.LifecycleSupervisor.unsupervise:156) - > Stopping component: org.apache.flume.channel.MemoryChannel{name: ch2} > > 24 Sep 2012 21:32:18,601 INFO [node-shutdownHook] > (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:87) - > Component type: CHANNEL, name: ch2 stopped > > 24 Sep 2012 21:32:18,601 INFO [node-shutdownHook] > (org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.stopAllComponents:92) > - Stopping Channel ch1 > > 24 Sep 2012 21:32:18,601 INFO [node-shutdownHook] > (org.apache.flume.lifecycle.LifecycleSupervisor.unsupervise:156) - > Stopping component: org.apache.flume.channel.MemoryChannel{name: ch1} > > 24 Sep 2012 21:32:18,602 INFO [node-shutdownHook] > (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:87) - > Component type: CHANNEL, name: ch1 stopped > > 24 Sep 2012 21:32:18,602 INFO [node-shutdownHook] > (org.apache.flume.lifecycle.LifecycleSupervisor.stop:78) - Stopping > lifecycle supervisor 8 > > 24 Sep 2012 21:32:18,604 INFO [node-shutdownHook] > (org.apache.flume.conf.file.AbstractFileConfigurationProvider.stop:91) - > Configuration provider stopping > > > Thanks, > > Harish >
