It looks like the deserializer you are using is throwing the exception causing the source to not work. Also spool dir source can work with only one directory, so it is inlet reading files from one of the specified directories, not all three of them.
On Sunday, August 30, 2015, Nikhil Gs <[email protected]> wrote: > Hello Team, > > Thank you in advance for your cooperation and time. > > Facing this issue several times. Tried to restart flume and build the jar. > But files are not deleting from the flume spool directory and below is my > error and flume config file. > > Any suggestions would be great helpful. It is urgent because of production. > > > # Please paste flume.conf here. Example: > > # Sources, channels, and sinks are defined per > # agent name, in this case 'pnmtest2'. > pnmtest2.sources = SPOOL > pnmtest2.channels = MemChanneltest2 > pnmtest2.sinks = AVRO > > # For each source, channel, and sink, set > # standard properties. > pnmtest2.sources.SPOOL.type = spooldir > pnmtest2.sources.SPOOL.spoolDir = /home/a_nikhil.gopishetti/pnm > #pnmtest2.sources.SPOOL.spoolDir = > /home/s_sdldalplhdxxxedh/pnmtest2-poll-results > #pnmtest2.sources.SPOOL.spoolDir = > /home/s_sdldalplhdxxxedh/pnm-poll-results-uat > pnmtest2.sources.SPOOL.ignorePattern = \.*tmp$ > pnmtest2.sources.SPOOL.channels = MemChanneltest2 > pnmtest2.sources.SPOOL.fileHeader = true > pnmtest2.sources.SPOOL.deletePolicy = immediate > pnmtest2.sources.SPOOL.consumeOrder = oldest > pnmtest2.sources.SPOOL.batchSize = 100 > > pnmtest2.sources.SPOOL.interceptors = time > pnmtest2.sources.SPOOL.interceptors.time.type = > org.apache.flume.interceptor.TimestampInterceptor$Builder > pnmtest2.sources.SPOOL.deserializer = > com.suddenlink.flume.WholeFileDeserializer$Builder > > pnmtest2.sinks.AVRO.type = avro > pnmtest2.sinks.AVRO.channel = MemChanneltest2 > pnmtest2.sinks.AVRO.hostname = sdldalplhdw02.suddenlink.cequel3.com > pnmtest2.sinks.AVRO.port = 40002 > pnmtest2.sinks.AVRO.batch-size = 100 > pnmtest2.sinks.AVRO.connect-timeout = 40000 > > #pnmtest2.channels.MemChanneltest1.capacity = 10000 > #pnmtest2.channels.MemChanneltest1.type = memory > > pnmtest2.channels.MemChanneltest2.capacity = 1000000 > pnmtest2.channels.MemChanneltest2.type = memory > > > *Flume Log Error* > > > 12:02:57.363 PMINFOorg.mortbay.log > > Started [email protected]:41414 > > 12:02:57.621 PMINFOorg.apache.flume.sink.AbstractRpcSink > > Rpc sink AVRO started. > > 12:07:30.095 PMERRORorg.apache.flume.source.SpoolDirectorySource > > *FATAL: Spool Directory source SPOOL: { spoolDir: > /home/a_nikhil.gopishetti/pnm/ }: Uncaught exception in SpoolDirectorySource > thread. Restart or reconfigure Flume to continue processing. > java.lang.RuntimeException: WholeFileDeserializer creates one event for the > whole file > at > com.suddenlink.flume.WholeFileDeserializer.readEvents(WholeFileDeserializer.java:59) > at > org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readEvents(ReliableSpoolingFileEventReader.java:252) > at > org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:228) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745)* > > 1:12:31.161 PMINFOorg.apache.flume.lifecycle.LifecycleSupervisor > > Stopping lifecycle supervisor 11 > > 1:12:31.163 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup > > Component type: CHANNEL, name: MemChanneltest2 stopped > > 1:12:31.163 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup > > Shutdown Metric for type: CHANNEL, name: MemChanneltest2. channel.start.time > == 1440954177242 > > 1:12:31.163 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup > > Shutdown Metric for type: CHANNEL, name: MemChanneltest2. channel.stop.time > == 1440958351163 > > 1:12:31.163 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup > > Shutdown Metric for type: CHANNEL, name: MemChanneltest2. channel.capacity == > 1000000 > > 1:12:31.163 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup > > Shutdown Metric for type: CHANNEL, name: MemChanneltest2. > channel.current.size == 0 > > 1:12:31.164 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup > > Shutdown Metric for type: CHANNEL, name: MemChanneltest2. > channel.event.put.attempt == 0 > > 1:12:31.164 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup > > Shutdown Metric for type: CHANNEL, name: MemChanneltest2. > channel.event.put.success == 0 > > 1:12:31.164 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup > > Shutdown Metric for type: CHANNEL, name: MemChanneltest2. > channel.event.take.attempt == 523 > > 1:12:31.164 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup > > Shutdown Metric for type: CHANNEL, name: MemChanneltest2. > channel.event.take.success == 0 > > 1:12:31.164 > PMINFOorg.apache.flume.node.PollingPropertiesFileConfigurationProvider > > Configuration provider stopping > > 1:12:31.164 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup > > Component type: SOURCE, name: SPOOL stopped > > 1:12:31.164 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup > > Shutdown Metric for type: SOURCE, name: SPOOL. source.start.time == > 1440954177279 > > 1:12:31.164 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup > > Shutdown Metric for type: SOURCE, name: SPOOL. source.stop.time == > 1440958351164 > > 1:12:31.164 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup > > Shutdown Metric for type: SOURCE, name: SPOOL. src.append-batch.accepted == 0 > > 1:12:31.165 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup > > Shutdown Metric for type: SOURCE, name: SPOOL. src.append-batch.received == 0 > > 1:12:31.165 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup > > Shutdown Metric for type: SOURCE, name: SPOOL. src.append.accepted == 0 > > 1:12:31.165 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup > > Shutdown Metric for type: SOURCE, name: SPOOL. src.append.received == 0 > > 1:12:31.165 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup > > Shutdown Metric for type: SOURCE, name: SPOOL. src.events.accepted == 0 > > 1:12:31.165 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup > > Shutdown Metric for type: SOURCE, name: SPOOL. src.events.received == 0 > > 1:12:31.165 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup > > Shutdown Metric for type: SOURCE, name: SPOOL. src.open-connection.count == 0 > > 1:12:31.165 PMINFOorg.apache.flume.source.SpoolDirectorySource > > SpoolDir source SPOOL stopped. Metrics: SOURCE:SPOOL{src.events.accepted=0, > src.events.received=0, src.append.accepted=0, src.append-batch.accepted=0, > src.open-connection.count=0, src.append-batch.received=0, > src.append.received=0} > > 1:12:31.165 PMINFOorg.apache.flume.sink.AbstractRpcSink > > Rpc sink AVRO stopping... > > 1:12:31.170 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup > > Component type: SINK, name: AVRO stopped > > 1:12:31.170 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup > > Shutdown Metric for type: SINK, name: AVRO. sink.start.time == 1440954177243 > > 1:12:31.170 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup > > Shutdown Metric for type: SINK, name: AVRO. sink.stop.time == 1440958351170 > > 1:12:31.170 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup > > Shutdown Metric for type: SINK, name: AVRO. sink.batch.complete == 0 > > 1:12:31.170 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup > > Shutdown Metric for type: SINK, name: AVRO. sink.batch.empty == 523 > > 1:12:31.171 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup > > Shutdown Metric for type: SINK, name: AVRO. sink.batch.underflow == 0 > > 1:12:31.171 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup > > Shutdown Metric for type: SINK, name: AVRO. sink.connection.closed.count == 1 > > 1:12:31.171 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup > > Shutdown Metric for type: SINK, name: AVRO. sink.connection.creation.count == > 1 > > 1:12:31.171 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup > > Shutdown Metric for type: SINK, name: AVRO. sink.connection.failed.count == 0 > > 1:12:31.171 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup > > Shutdown Metric for type: SINK, name: AVRO. sink.event.drain.attempt == 0 > > 1:12:31.171 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup > > Shutdown Metric for type: SINK, name: AVRO. sink.event.drain.sucess == 0 > > 1:12:31.171 PMINFOorg.apache.flume.sink.AbstractRpcSink > > Rpc sink AVRO stopped. Metrics: SINK:AVRO{sink.connection.closed.count=1, > sink.event.drain.attempt=0, sink.batch.underflow=0, > sink.connection.failed.count=0, sink.connection.creation.count=1, > sink.event.drain.sucess=0, sink.batch.empty=523, sink.batch.complete=0} > > 1:12:31.171 PMINFOorg.mortbay.log > > Stopped [email protected]:41414 > > 1:12:46.490 > PMINFOorg.apache.flume.node.PollingPropertiesFileConfigurationProvider > > Configuration provider starting > > 1:12:46.513 > PMINFOorg.apache.flume.node.PollingPropertiesFileConfigurationProvider > > Reloading configuration > file:/var/run/cloudera-scm-agent/process/5683-flume-AGENT/flume.conf > > 1:12:46.520 PMINFOorg.apache.flume.conf.FlumeConfiguration > > Processing:AVRO > > 1:12:46.521 PMINFOorg.apache.flume.conf.FlumeConfiguration > > Processing:AVRO > > 1:12:46.521 PMINFOorg.apache.flume.conf.FlumeConfiguration > > Processing:AVRO > > 1:12:46.521 PMINFOorg.apache.flume.conf.FlumeConfiguration > > Processing:AVRO > > 1:12:46.522 PMINFOorg.apache.flume.conf.FlumeConfiguration > > Added sinks: AVRO Agent: pnmtest2 > > 1:12:46.522 PMINFOorg.apache.flume.conf.FlumeConfiguration > > Processing:AVRO > > 1:12:46.522 PMINFOorg.apache.flume.conf.FlumeConfiguration > > Processing:AVRO > > 1:12:46.554 PMINFOorg.apache.flume.conf.FlumeConfiguration > > Post-validation flume configuration contains configuration for agents: > [pnmtest2] > > 1:12:46.554 PMINFOorg.apache.flume.node.AbstractConfigurationProvider > > Creating channels > > 1:12:46.564 PMINFOorg.apache.flume.channel.DefaultChannelFactory > > Creating instance of channel MemChanneltest2 type memory > > 1:12:46.569 PMINFOorg.apache.flume.node.AbstractConfigurationProvider > > Created channel MemChanneltest2 > > 1:12:46.570 PMINFOorg.apache.flume.source.DefaultSourceFactory > > Creating instance of source SPOOL, type spooldir > > 1:12:46.597 PMINFOorg.apache.flume.sink.DefaultSinkFactory > > Creating instance of sink: AVRO, type: avro > > 1:12:46.604 PMINFOorg.apache.flume.sink.AbstractRpcSink > > Connection reset is set to 0. Will not reset connection to next hop > > 1:12:46.606 PMINFOorg.apache.flume.node.AbstractConfigurationProvider > > Channel MemChanneltest2 connected to [SPOOL, AVRO] > > 1:12:46.615 PMINFOorg.apache.flume.node.Application > > Starting new configuration:{ sourceRunners:{SPOOL=EventDrivenSourceRunner: { > source:Spool Directory source SPOOL: { spoolDir: > /home/a_nikhil.gopishetti/pnm } }} sinkRunners:{AVRO=SinkRunner: { > policy:org.apache.flume.sink.DefaultSinkProcessor@452d9a56 counterGroup:{ > name:null counters:{} } }} > channels:{MemChanneltest2=org.apache.flume.channel.MemoryChannel{name: > MemChanneltest2}} } > > 1:12:46.615 PMINFOorg.apache.flume.node.Application > > Starting Channel MemChanneltest2 > > 1:12:46.674 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup > > Monitored counter group for type: CHANNEL, name: MemChanneltest2: > Successfully registered new MBean. > > 1:12:46.675 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup > > Component type: CHANNEL, name: MemChanneltest2 started > > 1:12:46.675 PMINFOorg.apache.flume.node.Application > > Starting Sink AVRO > > 1:12:46.675 PMINFOorg.apache.flume.sink.AbstractRpcSink > > Starting RpcSink AVRO { host: sdldalplhdw02.suddenlink.cequel3.com, port: > 40002 }... > > 1:12:46.675 PMINFOorg.apache.flume.node.Application > > Starting Source SPOOL > > 1:12:46.676 PMINFOorg.apache.flume.source.SpoolDirectorySource > > SpoolDirectorySource source starting with directory: > /home/a_nikhil.gopishetti/pnm > > 1:12:46.676 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup > > Monitored counter group for type: SINK, name: AVRO: Successfully registered > new MBean. > > 1:12:46.676 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup > > Component type: SINK, name: AVRO started > > 1:12:46.676 PMINFOorg.apache.flume.sink.AbstractRpcSink > > Rpc sink AVRO: Building RpcClient with hostname: > sdldalplhdw02.suddenlink.cequel3.com, port: 40002 > > 1:12:46.676 PMINFOorg.apache.flume.sink.AvroSink > > Attempting to create Avro Rpc client. > > 1:12:46.699 PMWARNorg.apache.flume.api.NettyAvroRpcClient > > Using default maxIOWorkers > > 1:12:46.706 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup > > Monitored counter group for type: SOURCE, name: SPOOL: Successfully > registered new MBean. > > 1:12:46.706 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup > > Component type: SOURCE, name: SPOOL started > > 1:12:46.718 PMINFOorg.mortbay.log > > Logging to org.slf4j.impl.Log4jLoggerAdapter(org.mortbay.log) via > org.mortbay.log.Slf4jLog > > 1:12:46.783 PMINFOorg.mortbay.log > > jetty-6.1.26.cloudera.4 > > 1:12:46.829 PMINFOorg.mortbay.log > > Started [email protected]:41414 > > 1:12:47.068 PMINFOorg.apache.flume.sink.AbstractRpcSink > > Rpc sink AVRO started. > > > > Regards, > Nik. > -- Thanks, Hari
