Hello Experts,

I am trying to convert a custom data source received in flume into avro and
push to hdfs. What I am attempting to do is
syslog -> flume -> flume interceptor to convert into avroObject.toByteArray
-> hdfs serializer which decodes the byteArray back to Avro

The flume configuration looks like:

tier1.sources.syslogsource.interceptors.i2.type=timestamp
tier1.sources.syslogsource.interceptors.i2.preserveExisting=true
tier1.sources.syslogsource.interceptors.i1.dataSourceType=DataSource1
tier1.sources.syslogsource.interceptors.i1.type =
com.flume.CustomToAvroConvertInterceptor$Builder

#hdfs sink for archival and batch analysis
tier1.sinks.hdfssink.type = hdfs
tier1.sinks.hdfssink.hdfs.writeFormat = Text
tier1.sinks.hdfssink.hdfs.fileType = DataStream
tier1.sinks.hdfssink.hdfs.filePrefix=%{flumeHost}-%{host}%{customerId}-%Y%m%d-%H
tier1.sinks.hdfssink.hdfs.inUsePrefix=_
tier1.sinks.hdfssink.hdfs.path=/hive/rawavro/customer_id=%{customerId}/date=%Y%m%d/hr=%H
tier1.sinks.hdfssink.hdfs.fileSuffix=.avro
# roll file if it's been 10 * 60 seconds = 600
tier1.sinks.hdfssink.hdfs.rollInterval=600
# roll file if we get 50,000 log lines (~25MB)
tier1.sinks.hdfssink.hdfs.rollCount=0
tier1.sinks.hdfssink.hdfs.batchSize = 100
tier1.sinks.hdfssink.hdfs.rollSize=0
tier1.sinks.hdfssink.serializer=com.flume.RawAvroHiveSerializer$Builder
tier1.sinks.hdfssink.serializer.compressionCodec=snappy
tier1.sinks.hdfssink.channel = hdfsmem

When I use tier1.sinks.hdfssink.serializer=avro_event
I get binary data stored into hdfs which is the
CustomToAvroConvertInterceptor.intercept(event.getbody).toByteArray ,
however this data cannot be parsed in hive. As a result, I see all nulls in
the column values.
Based on -
https://cwiki.apache.org/confluence/display/AVRO/FAQ#FAQ-HowcanIserializedirectlyto/fromabytearray
?
all I am doing in RawAvroHiveSerializer.convert is to decode using binary
Decoder.
The exception I get seems to be unrelated to the code itself, hence pasting
the stack trace. Will share the code if it is required to identify the
rootcause:

2016-07-26 19:15:27,187 ERROR org.apache.flume.SinkRunner: Unable to
deliver event. Exception follows.
org.apache.flume.EventDeliveryException:
org.apache.avro.AvroRuntimeException: not open
        at
org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:463)
        at
org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
        at
org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
        at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.avro.AvroRuntimeException: not open
        at
org.apache.avro.file.DataFileWriter.assertOpen(DataFileWriter.java:82)
        at
org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:299)
        at
org.apache.flume.serialization.AbstractAvroEventSerializer.write(AbstractAvroEventSerializer.java:108)
        at
org.apache.flume.sink.hdfs.HDFSDataStream.append(HDFSDataStream.java:124)
        at
org.apache.flume.sink.hdfs.BucketWriter$7.call(BucketWriter.java:550)
        at
org.apache.flume.sink.hdfs.BucketWriter$7.call(BucketWriter.java:547)
        at
org.apache.flume.sink.hdfs.BucketWriter$9$1.run(BucketWriter.java:679)
        at
org.apache.flume.auth.SimpleAuthenticator.execute(SimpleAuthenticator.java:50)

I can reproduce this local file system as well. In the testcase, I tried
setting the file open to append=true and still encounter the same exception.

Appreciate any guidance in this regard.

regards
Sunita

Reply via email to