Hello,

I’ve recently upgraded to storm and storm-kafka 0.9.2-incubating, replacing
the https://github.com/wurstmeister/storm-kafka-0.8-plus spout I was using
previously.

I have a large kafka log that I needed processed. I started my topology
with

storm.kafka.SpoutConfig spoutConfig = new SpoutConfig....
spoutConfig.forceFromStart = true;

I then needed to make some tweaks in my application code and restarted the
topology with spoutConfig.forceFromStart = false. Expecting to pick up
where I left off in my kafka log. Instead the kafka spout started from the
latest offset. Upon investigation I found this log message in my storm
worker logs

2014-07-09 18:02:15 s.k.PartitionManager [INFO] Read last commit
offset from zookeeper: 15266940; old topology_id:
ef3f1f89-f64c-4947-b6eb-0c7fb9adb9ea - new topology_id:
5747dba6-c947-4c4f-af4a-4f50a84817bf
2014-07-09 18:02:15 s.k.PartitionManager [INFO] Last commit offset
from zookeeper: 15266940
2014-07-09 18:02:15 s.k.PartitionManager [INFO] Commit offset 22092614
is more than 100000 behind, resetting to startOffsetTime=-2
2014-07-09 18:02:15 s.k.PartitionManager [INFO] Starting Kafka
prd-use1c-pr-08-kafka-kamq-0004:4 from offset 22092614

Digging in the storm-kafka spout I found this line
https://github.com/apache/incubator-storm/blob/master/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java#L95

To fix this problem I ended up setting my spout config like so

spoutConf.maxOffsetBehind = Long.MAX_VALUE;

Now finally to my question.

Why would the kafka spout skip to the latest offset if the current offset
is more then 100000 behind by default?

This seems like a bad default value, the spout literally skipped over
months of data without any warning.

Are the core contributors open to accepting a pull request that would set
the default to Long.MAX_VALUE?

Thanks,

Curtis Allen
​

Reply via email to