I still don't have a good understanding of how/where offsets are stored when using the trident spouts and how to configure it.
I'm getting this error at the moment (the topology was running fine a couple of days ago...) related to offsets, and not sure how to investigate it: *2015-05-06T12:09:20.414+0200 s.k.KafkaUtils [WARN] Got fetch request with offset out of range: [708429188]; retrying with default start offset time from configuration.* * configured start offset time: [-2]* *2015-05-06T12:09:20.415+0200 b.s.util [ERROR] Async loop died!* I'm using storm 0.9.4 with the OpaqueTrdientKafkaSpout Any ideas how I can resolve this? :) On Fri, May 1, 2015 at 2:05 PM, Josh <[email protected]> wrote: > Hi Taylor, > > Could you be a bit more specific please? I can't find where the wiki > describes the trident kafka spout storing offsets or where it's done in the > code base. > > The wiki explains that SpoutConfig (for the KafkaSpout) has the zkRoot > property which determines where it stores zookeeper offsets. But > TridentKafkaConfig (for the TransactionalTridentKafkaSpout and > OpaqueTridentKafkaSpout) does not have this property. > > Thanks, > Josh > > On Fri, May 1, 2015 at 12:27 PM, P. Taylor Goetz <[email protected]> > wrote: > >> Hi Josh, >> >> The trident kafka spout stores offsets in zookeeper as well. See: >> >> >> https://github.com/apache/storm/tree/master/external/storm-kafka >> >> -Taylor >> >> On May 1, 2015, at 5:00 AM, Josh <[email protected]> wrote: >> >> Looks like this was some kind of network issue... I ran the topology on >> another box and don't have the problem any more. >> >> I have a related question about the TransactionalTridentKafkaSpout and >> OpaqueTridentKafkaSpout though: does anyone know where these spouts store >> their partition offsets to use for recovery in the case of failure? >> >> I know the normal (non-trident) Kafka spout stores its offsets in >> Zookeeper, and it has a spout config setting ZkHosts which determines where >> they are stored. But the trident Kafka spouts don't have this setting. >> Looking at the source code I can't find any place where it is persisting >> offsets. Plus it's using the SimpleConsumer so it's not relying on Kafka >> persisting its offsets. >> >> On Wed, Apr 29, 2015 at 4:29 PM, Josh <[email protected]> wrote: >> >>> Hi all, >>> >>> I'm using the OpaqueTridentKafkaSpout to read batches of tuples from >>> Kafka, I'm doing some filtering and aggregation and then calling >>> persistAggregate to maintain a map state: >>> >>> stream >>> .each(new Fields("msg"), new MyFilter()) >>> .each(new Fields("msg"), new ExtractFieldsFunction(), new >>> Fields("id")) >>> .groupBy(new Fields("id")) >>> .persistentAggregate(MyMapState.getNonTransactional, new >>> CountAggregator(), new Fields("count")) >>> .parallelismHint(1) >>> >>> It works fine, for the first batch, but then I am having a very strange >>> problem where after the first batch my map state is no longer called. (i.e. >>> there a call to multiGet and multiPut for the first batch only). >>> >>> The spout is still providing tuples, as when I debug I can see that the >>> filter and function both continue to process input tuples (indefinitely). >>> But the map state never gets called again! >>> >>> Why would this happen? >>> >>> I found a couple of very similar threads to this, but they have gone >>> unanswered: >>> >>> https://groups.google.com/forum/#!searchin/storm-user/trident$20state/storm-user/wJbKbHDxRqI/KCiBE5lTdGEJ >>> >>> https://groups.google.com/forum/#!searchin/storm-user/trident$20state/storm-user/OlYUoCjx-uU/99XdNkdaSnIJ >>> >>> >>> Thanks for any help on this, >>> >>> Josh >>> >> >> >
