Dear all,

I made a custom interceptor in order to insert the timestamp header that is 
used by the HDFS sink.
Firstly, I run an example using SPOOLING dir as a source, FILE channel and HDFS 
sink. It was find.
Secondly, I changed the configuration so having two machines, the conf in each 
of them was:

FIRST MACHINE:
SPOOLING source(with my custom interceptor)
FILE channel
AVRO sink

SECOND MACHINE:
AVRO source
FILE channel
HDFS sink

Now, the error that I am getting is(from second machine):

ERROR hdfs.HDFSEventSink: process failed
java.lang.RuntimeException: Flume wasn't able to parse timestamp header in the 
event to resolve time based bucketing. Please check that you're correctly 
populating timestamp header (for example using TimestampInterceptor source 
interceptor).
at 
org.apache.flume.formatter.output.BucketPath.replaceShorthand(BucketPath.java:160)
at 
org.apache.flume.formatter.output.BucketPath.escapeString(BucketPath.java:343)
at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:392)
at 
org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
at java.lang.Thread.run(Thread.java:662)
Caused by: java.lang.NumberFormatException: null
at java.lang.Long.parseLong(Long.java:375)
at java.lang.Long.valueOf(Long.java:525)
at 
org.apache.flume.formatter.output.BucketPath.replaceShorthand(BucketPath.java:158)
... 5 more

It looks like the header is missing.

FIRST MACHINE CONF:
tier1.sources.source1.type     = spooldir
tier1.sources.source1.spoolDir = /home/user/flume/data
tier1.sources.source1.batchSize = 1000
tier1.sources.source1.bufferMaxLines = 3000
tier1.sources.source1.fileHeader = true
tier1.sources.source1.fileSuffix=.COMPLETED
tier1.sources.source1.channels = channel1
tier1.sources.source1.interceptors = it1
tier1.sources.source1.interceptors.it1.type = 
com.pdi.koios.flume.interceptors.DatetimeInterceptor$Builder
tier1.sources.source1.interceptors.it1.preserveExisting=true
tier1.sources.source1.interceptors.it1.dateRegex=\\d{4}-\\d{2}-\\d{2}
tier1.sources.source1.interceptors.it1.dateFormat=yyyy-MM-dd

tier1.channels.channel1.type   = file
tier1.channels.channel1.checkpointDir = /home/user/flume/channelcheckpoint
tier1.channels.channel1.dataDirs = /home/user/flume/channeldata
tier1.channels.channel1.capacity = 10000
tier1.channels.channel1.transactionCapacity = 10000

tier1.sinks.sink1.type = avro
tier1.sinks.sink1.hostname = 10.95.108.245
tier1.sinks.sink1.port = 4141
tier1.sinks.sink1.channel = channel1

SECOND MACHINE CONF:
tier1.sources.source1.type = avro
tier1.sources.source1.bind = 0.0.0.0
tier1.sources.source1.port = 4141
tier1.sources.source1.channels = channel1

tier1.channels.channel1.type   = file
tier1.channels.channel1.checkpointDir = /home/user/flume/channelcheckpoint
tier1.channels.channel1.dataDirs = /home/user/flume/channeldata
tier1.channels.channel1.capacity = 10000
tier1.channels.channel1.transactionCapacity = 10000

tier1.sinks.sink1.type = hdfs
tier1.sinks.sink1.channel = channel1
tier1.sinks.sink1.hdfs.batchSize = 1000
tier1.sinks.sink1.hdfs.rollInterval = 5
tier1.sinks.sink1.hdfs.rollTimeout = 10
tier1.sinks.sink1.hdfs.rollCount = 0
tier1.sinks.sink1.hdfs.rollSize = 0
tier1.sinks.sink1.hdfs.path = /user/user/flume/%Y/%m/%d
tier1.sinks.sink1.hdfs.fileType = DataStream
tier1.sinks.sink1.writeFormat = Text

________________________________

Este mensaje se dirige exclusivamente a su destinatario. Puede consultar 
nuestra pol?tica de env?o y recepci?n de correo electr?nico en el enlace 
situado m?s abajo.
This message is intended exclusively for its addressee. We only send and 
receive email on the basis of the terms set out at:
http://www.tid.es/ES/PAGINAS/disclaimer.aspx

Reply via email to