You are probably hitting https://issues.apache.org/jira/browse/FLUME-2578

Add a key to the message as a workaround or build Flume with that patch.

Alex Bohr wrote:

Hi,
I've been testing out Flafka for a few days but I've been getting lots
of errors and message loss.

Would really appreciate some advice before I abandon it for alternate
options.

We are still getting Lots of these errors:
2015-03-10 04:04:19,099
(PollableSourceRunner-KafkaSource-kafka-source-1) [ERROR -
org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:139)]
KafkaSource EXCEPTION, {}
java.lang.NullPointerException

Here's the version:
Flume 1.6.0-SNAPSHOT
Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
Revision: 3d03053615694ca638e5ddf314081826b8a5f1ac
Compiled by jenkins on Thu Feb 26 06:06:42 UTC 2015
From source with checksum 3a3f330e678e5790c8c8e5b99063eea8

Our Kafka events are not stored with a key but from looking at the
source I this patch is included and should handle that:
https://issues.apache.org/jira/secure/attachment/12688560/FLUME-2578.0.patch

So I'm not sure what's still complaining about.

Also getting lots of errors on the sink, and lots of files are left
with ".tmp" suffix. I see that's a known issue but I'm not sure how
to handle it - how do I know when i can start ingesting a file if it
still has the ".tmp" suffix - do I need to see if the file size
changes or check the last modified time?

We are trying to ingest about 100K QPS from a Kafka queue with 1024
partitions. I'm not spinning up that many Flume agents. We have 4
machines running 20 agents each. The offsets are holding fairly
steady so we're keeping up over a few hours of test.
But the end result shows about 1% message loss.

Any advice on some configs I should tune to try and reduce the
errors? I've been playing with transaction capacity and batch sizes
but no improvement.

# Sources, channels, and sinks are defined per
# agent name, in this case flume1.
flume1.sources = kafka-source-1
flume1.channels = hdfs-channel-1
flume1.sinks = hdfs-sink-1

# For each source, channel, and sink, set
# standard properties.
flume1.sources.kafka-source-1.type =
org.apache.flume.source.kafka.KafkaSource
flume1.sources.kafka-source-1.zookeeperConnect =
xxx.xx.xx.107:2181/kafkaCluster
flume1.sources.kafka-source-1.topic = Events3
flume1.sources.kafka-source-1.groupid = floomTest
flume1.sources.kafka-source-1.batchSize = 10000
flume1.sources.kafka-source-1.channels = hdfs-channel-1

flume1.channels.hdfs-channel-1.type = memory
flume1.sinks.hdfs-sink-1.channel = hdfs-channel-1
flume1.sinks.hdfs-sink-1.type = hdfs
flume1.sinks.hdfs-sink-1.hdfs.writeFormat = Writable
flume1.sinks.hdfs-sink-1.hdfs.fileType = SequenceFile
flume1.sinks.hdfs-sink-1.hdfs.filePrefix =
%Y-%m-%d-%H-%M-%{host}-1-sequence-events
flume1.sinks.hdfs-sink-1.hdfs.useLocalTimeStamp = true
flume1.sinks.hdfs-sink-1.hdfs.path =
hdfs://xxx.xx.xxx.41:8020/user/gxetl/test_flume/%{topic}
flume1.sinks.hdfs-sink-1.hdfs.rollCount=0
flume1.sinks.hdfs-sink-1.hdfs.rollSize=0
flume1.sinks.hdfs-sink-1.hdfs.rollInterval=120

# Other properties are specific to each type of
# source, channel, or sink. In this case, we
# specify the capacity of the memory channel.
flume1.channels.hdfs-channel-1.capacity = 500000
flume1.channels.hdfs-channel-1.transactionCapacity = 100000

Any advice much appreciated!
Thanks!


Reply via email to