I'm also +1 on this.

The old spout behaviour was perfectly fine. I guess maxOffsetBehind was
added as a protection against fetching unavailable Kafka offsets, but it
doesn't really make sense to me, in my Trident transactional topology where
I can't afford to lose any data. I would rather have my spout stop
processing data in this case, than skipping some offsets because of an
arbitrary maxOffsetBehind config value. Others opinions may vary, but I
think
setting this to Long.MAX_VALUE would make a much better default, as it
would be closer to the old spout behaviour.

On Wednesday, July 9, 2014, Curtis Allen <[email protected]> wrote:

> Hello,
>
> I’ve recently upgraded to storm and storm-kafka 0.9.2-incubating,
> replacing the https://github.com/wurstmeister/storm-kafka-0.8-plus spout
> I was using previously.
>
> I have a large kafka log that I needed processed. I started my topology
> with
>
> storm.kafka.SpoutConfig spoutConfig = new SpoutConfig....
> spoutConfig.forceFromStart = true;
>
> I then needed to make some tweaks in my application code and restarted the
> topology with spoutConfig.forceFromStart = false. Expecting to pick up
> where I left off in my kafka log. Instead the kafka spout started from the
> latest offset. Upon investigation I found this log message in my storm
> worker logs
>
> 2014-07-09 18:02:15 s.k.PartitionManager [INFO] Read last commit offset from 
> zookeeper: 15266940; old topology_id: ef3f1f89-f64c-4947-b6eb-0c7fb9adb9ea - 
> new topology_id: 5747dba6-c947-4c4f-af4a-4f50a84817bf
> 2014-07-09 18:02:15 s.k.PartitionManager [INFO] Last commit offset from 
> zookeeper: 15266940
> 2014-07-09 18:02:15 s.k.PartitionManager [INFO] Commit offset 22092614 is 
> more than 100000 behind, resetting to startOffsetTime=-2
> 2014-07-09 18:02:15 s.k.PartitionManager [INFO] Starting Kafka 
> prd-use1c-pr-08-kafka-kamq-0004:4 from offset 22092614
>
> Digging in the storm-kafka spout I found this line
>
> https://github.com/apache/incubator-storm/blob/master/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java#L95
>
> To fix this problem I ended up setting my spout config like so
>
> spoutConf.maxOffsetBehind = Long.MAX_VALUE;
>
> Now finally to my question.
>
> Why would the kafka spout skip to the latest offset if the current offset
> is more then 100000 behind by default?
>
> This seems like a bad default value, the spout literally skipped over
> months of data without any warning.
>
> Are the core contributors open to accepting a pull request that would set
> the default to Long.MAX_VALUE?
>
> Thanks,
>
> Curtis Allen
> ​
>


-- 
Danijel Schiavuzzi

E: [email protected]
W: www.schiavuzzi.com
T: +385989035562
Skype: danijels7

Reply via email to