Dear all,
I made a custom interceptor in order to insert the timestamp header that is
used by the HDFS sink.
Firstly, I run an example using SPOOLING dir as a source, FILE channel and HDFS
sink. It was find.
Secondly, I changed the configuration so having two machines, the conf in each
of them was:
FIRST MACHINE:
SPOOLING source(with my custom interceptor)
FILE channel
AVRO sink
SECOND MACHINE:
AVRO source
FILE channel
HDFS sink
Now, the error that I am getting is(from second machine):
ERROR hdfs.HDFSEventSink: process failed
java.lang.RuntimeException: Flume wasn't able to parse timestamp header in the
event to resolve time based bucketing. Please check that you're correctly
populating timestamp header (for example using TimestampInterceptor source
interceptor).
at
org.apache.flume.formatter.output.BucketPath.replaceShorthand(BucketPath.java:160)
at
org.apache.flume.formatter.output.BucketPath.escapeString(BucketPath.java:343)
at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:392)
at
org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
at java.lang.Thread.run(Thread.java:662)
Caused by: java.lang.NumberFormatException: null
at java.lang.Long.parseLong(Long.java:375)
at java.lang.Long.valueOf(Long.java:525)
at
org.apache.flume.formatter.output.BucketPath.replaceShorthand(BucketPath.java:158)
... 5 more
It looks like the header is missing.
FIRST MACHINE CONF:
tier1.sources.source1.type = spooldir
tier1.sources.source1.spoolDir = /home/user/flume/data
tier1.sources.source1.batchSize = 1000
tier1.sources.source1.bufferMaxLines = 3000
tier1.sources.source1.fileHeader = true
tier1.sources.source1.fileSuffix=.COMPLETED
tier1.sources.source1.channels = channel1
tier1.sources.source1.interceptors = it1
tier1.sources.source1.interceptors.it1.type =
com.pdi.koios.flume.interceptors.DatetimeInterceptor$Builder
tier1.sources.source1.interceptors.it1.preserveExisting=true
tier1.sources.source1.interceptors.it1.dateRegex=\\d{4}-\\d{2}-\\d{2}
tier1.sources.source1.interceptors.it1.dateFormat=yyyy-MM-dd
tier1.channels.channel1.type = file
tier1.channels.channel1.checkpointDir = /home/user/flume/channelcheckpoint
tier1.channels.channel1.dataDirs = /home/user/flume/channeldata
tier1.channels.channel1.capacity = 10000
tier1.channels.channel1.transactionCapacity = 10000
tier1.sinks.sink1.type = avro
tier1.sinks.sink1.hostname = 10.95.108.245
tier1.sinks.sink1.port = 4141
tier1.sinks.sink1.channel = channel1
SECOND MACHINE CONF:
tier1.sources.source1.type = avro
tier1.sources.source1.bind = 0.0.0.0
tier1.sources.source1.port = 4141
tier1.sources.source1.channels = channel1
tier1.channels.channel1.type = file
tier1.channels.channel1.checkpointDir = /home/user/flume/channelcheckpoint
tier1.channels.channel1.dataDirs = /home/user/flume/channeldata
tier1.channels.channel1.capacity = 10000
tier1.channels.channel1.transactionCapacity = 10000
tier1.sinks.sink1.type = hdfs
tier1.sinks.sink1.channel = channel1
tier1.sinks.sink1.hdfs.batchSize = 1000
tier1.sinks.sink1.hdfs.rollInterval = 5
tier1.sinks.sink1.hdfs.rollTimeout = 10
tier1.sinks.sink1.hdfs.rollCount = 0
tier1.sinks.sink1.hdfs.rollSize = 0
tier1.sinks.sink1.hdfs.path = /user/user/flume/%Y/%m/%d
tier1.sinks.sink1.hdfs.fileType = DataStream
tier1.sinks.sink1.writeFormat = Text
________________________________
Este mensaje se dirige exclusivamente a su destinatario. Puede consultar
nuestra pol?tica de env?o y recepci?n de correo electr?nico en el enlace
situado m?s abajo.
This message is intended exclusively for its addressee. We only send and
receive email on the basis of the terms set out at:
http://www.tid.es/ES/PAGINAS/disclaimer.aspx