Hello Team and Hari,
Thanks for your reply Hari Shreedharan. If you observe the conf file,
pnmtest2.sources.SPOOL.spoolDir = /home/a_nikhil.gopishetti/pnm
#pnmtest2.sources.SPOOL.spoolDir =
/home/s_sdldalplhdxxxedh/pnmtest2-poll-results
#pnmtest2.sources.SPOOL.spoolDir =
/home/s_sdldalplhdxxxedh/pnm-poll-results-uat
Out of the three spool dir, I am using only one spool directory i.e.
pnmtest2.sources.SPOOL.spoolDir = /home/a_nikhil.gopishetti/pnm. The
other two are commented with the # symbol.
When coming to the deserializer file, below is the code for it;
public class WholeFileDeserializer implements EventDeserializer {
private Context context = null;
private ResettableInputStream stream = null;
private volatile boolean isOpen;
public WholeFileDeserializer(Context context, ResettableInputStream in)
{
this.context = context;
this.stream = in;
this.isOpen = true;
}
@Override
public Event readEvent() throws IOException {
ensureOpen();
ByteArrayOutputStream out = new ByteArrayOutputStream();
byte[] buffer = new byte[4096];
int rc = 0;
while (rc != -1) {
rc = stream.read(buffer, 0, 4096);
if (rc != -1) {
out.write(buffer, 0, rc);
}
}
if (out.size() > 0) {
System.out.println("Read event " + out);
return EventBuilder.withBody(out.toByteArray());
} else {
return null;
}
}
@Override
public List<Event> readEvents(int i) throws IOException {
ensureOpen();
if (i > 1) throw new RuntimeException("WholeFileDeserializer
creates one event for the whole file");
List<Event> events = new ArrayList<Event>(1);
Event event = readEvent();
if (event != null) {
events.add(event);
} else {
return events;
}
return events;
}
@Override
public void mark() throws IOException {
ensureOpen();
stream.mark();
}
@Override
public void reset() throws IOException {
ensureOpen();
stream.reset();
}
@Override
public void close() throws IOException {
if (isOpen) {
reset();
stream.close();
isOpen = false;
}
}
private void ensureOpen() {
if (!isOpen) {
throw new IllegalStateException("Serializer has been closed");
}
}
public static class Builder implements EventDeserializer.Builder {
@Override
public EventDeserializer build(Context context,
ResettableInputStream in) {
return new WholeFileDeserializer(context, in);
}
}
}
Your valuable time and suggestions are highly required and appreciated.
Thanks!
Regards,
Nik.
On Sun, Aug 30, 2015 at 2:17 PM, Hari Shreedharan <[email protected]
> wrote:
> [image: Boxbe] <https://www.boxbe.com/overview> This message is eligible
> for Automatic Cleanup! ([email protected]) Add cleanup rule
> <https://www.boxbe.com/popup?url=https%3A%2F%2Fwww.boxbe.com%2Fcleanup%3Ftoken%3DsGvPxkXH8XyTo4u7ZGWZpCn98ALFGASh9HxPB9YGPiS7BsrH2MU6k7xzT9ts2nZJbS5sTa%252FdS64Ll1zLbDf6mDmz2U%252Fvp7rdEBwhsYz8TGLwhDf6EAYngiAS6L5VX6Mf6XUMWaJEA2O4m9%252F5LUeOFw%253D%253D%26key%3DatoK4Lda9tEFLFe85OcfoCW3bKRUAprL6GYyn%252BQx3QI%253D&tc_serial=22470161841&tc_rand=2113902114&utm_source=stf&utm_medium=email&utm_campaign=ANNO_CLEANUP_ADD&utm_content=001>
> | More info
> <http://blog.boxbe.com/general/boxbe-automatic-cleanup?tc_serial=22470161841&tc_rand=2113902114&utm_source=stf&utm_medium=email&utm_campaign=ANNO_CLEANUP_ADD&utm_content=001>
>
> It looks like the deserializer you are using is throwing the exception
> causing the source to not work. Also spool dir source can work with only
> one directory, so it is inlet reading files from one of the specified
> directories, not all three of them.
>
> On Sunday, August 30, 2015, Nikhil Gs <[email protected]> wrote:
>
>> Hello Team,
>>
>> Thank you in advance for your cooperation and time.
>>
>> Facing this issue several times. Tried to restart flume and build the
>> jar. But files are not deleting from the flume spool directory and below is
>> my error and flume config file.
>>
>> Any suggestions would be great helpful. It is urgent because of
>> production.
>>
>>
>> # Please paste flume.conf here. Example:
>>
>> # Sources, channels, and sinks are defined per
>> # agent name, in this case 'pnmtest2'.
>> pnmtest2.sources = SPOOL
>> pnmtest2.channels = MemChanneltest2
>> pnmtest2.sinks = AVRO
>>
>> # For each source, channel, and sink, set
>> # standard properties.
>> pnmtest2.sources.SPOOL.type = spooldir
>> pnmtest2.sources.SPOOL.spoolDir = /home/a_nikhil.gopishetti/pnm
>> #pnmtest2.sources.SPOOL.spoolDir =
>> /home/s_sdldalplhdxxxedh/pnmtest2-poll-results
>> #pnmtest2.sources.SPOOL.spoolDir =
>> /home/s_sdldalplhdxxxedh/pnm-poll-results-uat
>> pnmtest2.sources.SPOOL.ignorePattern = \.*tmp$
>> pnmtest2.sources.SPOOL.channels = MemChanneltest2
>> pnmtest2.sources.SPOOL.fileHeader = true
>> pnmtest2.sources.SPOOL.deletePolicy = immediate
>> pnmtest2.sources.SPOOL.consumeOrder = oldest
>> pnmtest2.sources.SPOOL.batchSize = 100
>>
>> pnmtest2.sources.SPOOL.interceptors = time
>> pnmtest2.sources.SPOOL.interceptors.time.type =
>> org.apache.flume.interceptor.TimestampInterceptor$Builder
>> pnmtest2.sources.SPOOL.deserializer =
>> com.suddenlink.flume.WholeFileDeserializer$Builder
>>
>> pnmtest2.sinks.AVRO.type = avro
>> pnmtest2.sinks.AVRO.channel = MemChanneltest2
>> pnmtest2.sinks.AVRO.hostname = sdldalplhdw02.suddenlink.cequel3.com
>> pnmtest2.sinks.AVRO.port = 40002
>> pnmtest2.sinks.AVRO.batch-size = 100
>> pnmtest2.sinks.AVRO.connect-timeout = 40000
>>
>> #pnmtest2.channels.MemChanneltest1.capacity = 10000
>> #pnmtest2.channels.MemChanneltest1.type = memory
>>
>> pnmtest2.channels.MemChanneltest2.capacity = 1000000
>> pnmtest2.channels.MemChanneltest2.type = memory
>>
>>
>> *Flume Log Error*
>>
>>
>> 12:02:57.363 PMINFOorg.mortbay.log
>>
>> Started [email protected]:41414
>>
>> 12:02:57.621 PMINFOorg.apache.flume.sink.AbstractRpcSink
>>
>> Rpc sink AVRO started.
>>
>> 12:07:30.095 PMERRORorg.apache.flume.source.SpoolDirectorySource
>>
>> *FATAL: Spool Directory source SPOOL: { spoolDir:
>> /home/a_nikhil.gopishetti/pnm/ }: Uncaught exception in SpoolDirectorySource
>> thread. Restart or reconfigure Flume to continue processing.
>> java.lang.RuntimeException: WholeFileDeserializer creates one event for the
>> whole file
>> at
>> com.suddenlink.flume.WholeFileDeserializer.readEvents(WholeFileDeserializer.java:59)
>> at
>> org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readEvents(ReliableSpoolingFileEventReader.java:252)
>> at
>> org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:228)
>> at
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
>> at
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
>> at
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>> at java.lang.Thread.run(Thread.java:745)*
>>
>> 1:12:31.161 PMINFOorg.apache.flume.lifecycle.LifecycleSupervisor
>>
>> Stopping lifecycle supervisor 11
>>
>> 1:12:31.163 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup
>>
>> Component type: CHANNEL, name: MemChanneltest2 stopped
>>
>> 1:12:31.163 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup
>>
>> Shutdown Metric for type: CHANNEL, name: MemChanneltest2. channel.start.time
>> == 1440954177242
>>
>> 1:12:31.163 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup
>>
>> Shutdown Metric for type: CHANNEL, name: MemChanneltest2. channel.stop.time
>> == 1440958351163
>>
>> 1:12:31.163 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup
>>
>> Shutdown Metric for type: CHANNEL, name: MemChanneltest2. channel.capacity
>> == 1000000
>>
>> 1:12:31.163 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup
>>
>> Shutdown Metric for type: CHANNEL, name: MemChanneltest2.
>> channel.current.size == 0
>>
>> 1:12:31.164 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup
>>
>> Shutdown Metric for type: CHANNEL, name: MemChanneltest2.
>> channel.event.put.attempt == 0
>>
>> 1:12:31.164 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup
>>
>> Shutdown Metric for type: CHANNEL, name: MemChanneltest2.
>> channel.event.put.success == 0
>>
>> 1:12:31.164 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup
>>
>> Shutdown Metric for type: CHANNEL, name: MemChanneltest2.
>> channel.event.take.attempt == 523
>>
>> 1:12:31.164 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup
>>
>> Shutdown Metric for type: CHANNEL, name: MemChanneltest2.
>> channel.event.take.success == 0
>>
>> 1:12:31.164
>> PMINFOorg.apache.flume.node.PollingPropertiesFileConfigurationProvider
>>
>> Configuration provider stopping
>>
>> 1:12:31.164 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup
>>
>> Component type: SOURCE, name: SPOOL stopped
>>
>> 1:12:31.164 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup
>>
>> Shutdown Metric for type: SOURCE, name: SPOOL. source.start.time ==
>> 1440954177279
>>
>> 1:12:31.164 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup
>>
>> Shutdown Metric for type: SOURCE, name: SPOOL. source.stop.time ==
>> 1440958351164
>>
>> 1:12:31.164 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup
>>
>> Shutdown Metric for type: SOURCE, name: SPOOL. src.append-batch.accepted == 0
>>
>> 1:12:31.165 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup
>>
>> Shutdown Metric for type: SOURCE, name: SPOOL. src.append-batch.received == 0
>>
>> 1:12:31.165 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup
>>
>> Shutdown Metric for type: SOURCE, name: SPOOL. src.append.accepted == 0
>>
>> 1:12:31.165 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup
>>
>> Shutdown Metric for type: SOURCE, name: SPOOL. src.append.received == 0
>>
>> 1:12:31.165 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup
>>
>> Shutdown Metric for type: SOURCE, name: SPOOL. src.events.accepted == 0
>>
>> 1:12:31.165 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup
>>
>> Shutdown Metric for type: SOURCE, name: SPOOL. src.events.received == 0
>>
>> 1:12:31.165 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup
>>
>> Shutdown Metric for type: SOURCE, name: SPOOL. src.open-connection.count == 0
>>
>> 1:12:31.165 PMINFOorg.apache.flume.source.SpoolDirectorySource
>>
>> SpoolDir source SPOOL stopped. Metrics: SOURCE:SPOOL{src.events.accepted=0,
>> src.events.received=0, src.append.accepted=0, src.append-batch.accepted=0,
>> src.open-connection.count=0, src.append-batch.received=0,
>> src.append.received=0}
>>
>> 1:12:31.165 PMINFOorg.apache.flume.sink.AbstractRpcSink
>>
>> Rpc sink AVRO stopping...
>>
>> 1:12:31.170 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup
>>
>> Component type: SINK, name: AVRO stopped
>>
>> 1:12:31.170 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup
>>
>> Shutdown Metric for type: SINK, name: AVRO. sink.start.time == 1440954177243
>>
>> 1:12:31.170 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup
>>
>> Shutdown Metric for type: SINK, name: AVRO. sink.stop.time == 1440958351170
>>
>> 1:12:31.170 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup
>>
>> Shutdown Metric for type: SINK, name: AVRO. sink.batch.complete == 0
>>
>> 1:12:31.170 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup
>>
>> Shutdown Metric for type: SINK, name: AVRO. sink.batch.empty == 523
>>
>> 1:12:31.171 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup
>>
>> Shutdown Metric for type: SINK, name: AVRO. sink.batch.underflow == 0
>>
>> 1:12:31.171 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup
>>
>> Shutdown Metric for type: SINK, name: AVRO. sink.connection.closed.count == 1
>>
>> 1:12:31.171 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup
>>
>> Shutdown Metric for type: SINK, name: AVRO. sink.connection.creation.count
>> == 1
>>
>> 1:12:31.171 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup
>>
>> Shutdown Metric for type: SINK, name: AVRO. sink.connection.failed.count == 0
>>
>> 1:12:31.171 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup
>>
>> Shutdown Metric for type: SINK, name: AVRO. sink.event.drain.attempt == 0
>>
>> 1:12:31.171 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup
>>
>> Shutdown Metric for type: SINK, name: AVRO. sink.event.drain.sucess == 0
>>
>> 1:12:31.171 PMINFOorg.apache.flume.sink.AbstractRpcSink
>>
>> Rpc sink AVRO stopped. Metrics: SINK:AVRO{sink.connection.closed.count=1,
>> sink.event.drain.attempt=0, sink.batch.underflow=0,
>> sink.connection.failed.count=0, sink.connection.creation.count=1,
>> sink.event.drain.sucess=0, sink.batch.empty=523, sink.batch.complete=0}
>>
>> 1:12:31.171 PMINFOorg.mortbay.log
>>
>> Stopped [email protected]:41414
>>
>> 1:12:46.490
>> PMINFOorg.apache.flume.node.PollingPropertiesFileConfigurationProvider
>>
>> Configuration provider starting
>>
>> 1:12:46.513
>> PMINFOorg.apache.flume.node.PollingPropertiesFileConfigurationProvider
>>
>> Reloading configuration
>> file:/var/run/cloudera-scm-agent/process/5683-flume-AGENT/flume.conf
>>
>> 1:12:46.520 PMINFOorg.apache.flume.conf.FlumeConfiguration
>>
>> Processing:AVRO
>>
>> 1:12:46.521 PMINFOorg.apache.flume.conf.FlumeConfiguration
>>
>> Processing:AVRO
>>
>> 1:12:46.521 PMINFOorg.apache.flume.conf.FlumeConfiguration
>>
>> Processing:AVRO
>>
>> 1:12:46.521 PMINFOorg.apache.flume.conf.FlumeConfiguration
>>
>> Processing:AVRO
>>
>> 1:12:46.522 PMINFOorg.apache.flume.conf.FlumeConfiguration
>>
>> Added sinks: AVRO Agent: pnmtest2
>>
>> 1:12:46.522 PMINFOorg.apache.flume.conf.FlumeConfiguration
>>
>> Processing:AVRO
>>
>> 1:12:46.522 PMINFOorg.apache.flume.conf.FlumeConfiguration
>>
>> Processing:AVRO
>>
>> 1:12:46.554 PMINFOorg.apache.flume.conf.FlumeConfiguration
>>
>> Post-validation flume configuration contains configuration for agents:
>> [pnmtest2]
>>
>> 1:12:46.554 PMINFOorg.apache.flume.node.AbstractConfigurationProvider
>>
>> Creating channels
>>
>> 1:12:46.564 PMINFOorg.apache.flume.channel.DefaultChannelFactory
>>
>> Creating instance of channel MemChanneltest2 type memory
>>
>> 1:12:46.569 PMINFOorg.apache.flume.node.AbstractConfigurationProvider
>>
>> Created channel MemChanneltest2
>>
>> 1:12:46.570 PMINFOorg.apache.flume.source.DefaultSourceFactory
>>
>> Creating instance of source SPOOL, type spooldir
>>
>> 1:12:46.597 PMINFOorg.apache.flume.sink.DefaultSinkFactory
>>
>> Creating instance of sink: AVRO, type: avro
>>
>> 1:12:46.604 PMINFOorg.apache.flume.sink.AbstractRpcSink
>>
>> Connection reset is set to 0. Will not reset connection to next hop
>>
>> 1:12:46.606 PMINFOorg.apache.flume.node.AbstractConfigurationProvider
>>
>> Channel MemChanneltest2 connected to [SPOOL, AVRO]
>>
>> 1:12:46.615 PMINFOorg.apache.flume.node.Application
>>
>> Starting new configuration:{ sourceRunners:{SPOOL=EventDrivenSourceRunner: {
>> source:Spool Directory source SPOOL: { spoolDir:
>> /home/a_nikhil.gopishetti/pnm } }} sinkRunners:{AVRO=SinkRunner: {
>> policy:org.apache.flume.sink.DefaultSinkProcessor@452d9a56 counterGroup:{
>> name:null counters:{} } }}
>> channels:{MemChanneltest2=org.apache.flume.channel.MemoryChannel{name:
>> MemChanneltest2}} }
>>
>> 1:12:46.615 PMINFOorg.apache.flume.node.Application
>>
>> Starting Channel MemChanneltest2
>>
>> 1:12:46.674 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup
>>
>> Monitored counter group for type: CHANNEL, name: MemChanneltest2:
>> Successfully registered new MBean.
>>
>> 1:12:46.675 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup
>>
>> Component type: CHANNEL, name: MemChanneltest2 started
>>
>> 1:12:46.675 PMINFOorg.apache.flume.node.Application
>>
>> Starting Sink AVRO
>>
>> 1:12:46.675 PMINFOorg.apache.flume.sink.AbstractRpcSink
>>
>> Starting RpcSink AVRO { host: sdldalplhdw02.suddenlink.cequel3.com, port:
>> 40002 }...
>>
>> 1:12:46.675 PMINFOorg.apache.flume.node.Application
>>
>> Starting Source SPOOL
>>
>> 1:12:46.676 PMINFOorg.apache.flume.source.SpoolDirectorySource
>>
>> SpoolDirectorySource source starting with directory:
>> /home/a_nikhil.gopishetti/pnm
>>
>> 1:12:46.676 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup
>>
>> Monitored counter group for type: SINK, name: AVRO: Successfully registered
>> new MBean.
>>
>> 1:12:46.676 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup
>>
>> Component type: SINK, name: AVRO started
>>
>> 1:12:46.676 PMINFOorg.apache.flume.sink.AbstractRpcSink
>>
>> Rpc sink AVRO: Building RpcClient with hostname:
>> sdldalplhdw02.suddenlink.cequel3.com, port: 40002
>>
>> 1:12:46.676 PMINFOorg.apache.flume.sink.AvroSink
>>
>> Attempting to create Avro Rpc client.
>>
>> 1:12:46.699 PMWARNorg.apache.flume.api.NettyAvroRpcClient
>>
>> Using default maxIOWorkers
>>
>> 1:12:46.706 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup
>>
>> Monitored counter group for type: SOURCE, name: SPOOL: Successfully
>> registered new MBean.
>>
>> 1:12:46.706 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup
>>
>> Component type: SOURCE, name: SPOOL started
>>
>> 1:12:46.718 PMINFOorg.mortbay.log
>>
>> Logging to org.slf4j.impl.Log4jLoggerAdapter(org.mortbay.log) via
>> org.mortbay.log.Slf4jLog
>>
>> 1:12:46.783 PMINFOorg.mortbay.log
>>
>> jetty-6.1.26.cloudera.4
>>
>> 1:12:46.829 PMINFOorg.mortbay.log
>>
>> Started [email protected]:41414
>>
>> 1:12:47.068 PMINFOorg.apache.flume.sink.AbstractRpcSink
>>
>> Rpc sink AVRO started.
>>
>>
>>
>> Regards,
>> Nik.
>>
>
>
> --
>
> Thanks,
> Hari
>
>
>