Also, for monitoring a running spout, one could monitor the offset metrics produced by trident/TridentKafkaEmitter.java
From: jeffery maass <[email protected]<mailto:[email protected]>> Reply-To: "[email protected]<mailto:[email protected]>" <[email protected]<mailto:[email protected]>> Date: 2015,Wednesday, May 6 at 10:35 To: "[email protected]<mailto:[email protected]>" <[email protected]<mailto:[email protected]>> Subject: Re: Trident persistentAggregate only working for the first batch! Is there a way to find out which Kafka offset trident has consumed up to, by looking in Zookeeper?: ---------------------------------------------------------------------------------------------------------------------------------------- There is an app, specific to Kafka, which will monitor a consumer’s offset. I would recommend only using it in testing, as I believe, in my environment, it caused locking of some sort. Note that it is looking at the offset data stored by kafka, not the offset data stored by the spout. https://github.com/quantifind/KafkaOffsetMonitor Regarding the statement: I think you can configure it to ignore metadata and always read from the head or tail.. -------------------- That configuration should only be done if the logic of your application is ok with * head: replaying previously processed records * tail: not playing / skipping an unknown volume of records I wouldn’t necessarily “reset” the offsets in zookeeper. Shouldn’t the spout eventually catch up to the Kafka partitions / find a valid / existing offset? Like this: try 3, exception try 4, exception try 5, exception try 6, returns valid data From: Josh <[email protected]<mailto:[email protected]>> Reply-To: "[email protected]<mailto:[email protected]>" <[email protected]<mailto:[email protected]>> Date: 2015,Wednesday, May 6 at 09:43 To: "[email protected]<mailto:[email protected]>" <[email protected]<mailto:[email protected]>>, Nikhil Singh <[email protected]<mailto:[email protected]>> Subject: Re: Trident persistentAggregate only working for the first batch! Hi Nikhil, Thanks for the reply. OK that makes sense, I was trying to figure out where trident is storing stuff in zookeeper - turns out when I was using LocalCluster it was starting its own local Zookeeper instance on port 2000. But I've figured out how to configure it now: by setting the storm.zookeeper.servers property in storm.yaml. I can see it's storing batch IDs under: /transactional/{spout_name} and like you said deleting the data under the /transactional root has fixed the exception. Just wondering, how do the batch IDs translate to Kafka offsets? Is there a way to find out which Kafka offset trident has consumed up to, by looking in Zookeeper? Thanks, Josh On Wed, May 6, 2015 at 2:09 PM, Nikhil Singh <[email protected]<mailto:[email protected]>> wrote: Hi Josh, I think you need to clear the Kafka spout's metadata from the trident zookeeper. It has old metadata stored and the offset it has stored is no longer present in the Kafka queues. I think you can configure it to ignore metadata and always read from the head or tail.. -Nikhil On Wednesday, May 6, 2015 5:24 AM, Josh <[email protected]<mailto:[email protected]>> wrote: I still don't have a good understanding of how/where offsets are stored when using the trident spouts and how to configure it. I'm getting this error at the moment (the topology was running fine a couple of days ago...) related to offsets, and not sure how to investigate it: 2015-05-06T12:09:20.414+0200 s.k.KafkaUtils [WARN] Got fetch request with offset out of range: [708429188]; retrying with default start offset time from configuration. configured start offset time: [-2] 2015-05-06T12:09:20.415+0200 b.s.util [ERROR] Async loop died! I'm using storm 0.9.4 with the OpaqueTrdientKafkaSpout Any ideas how I can resolve this? :) On Fri, May 1, 2015 at 2:05 PM, Josh <[email protected]<mailto:[email protected]>> wrote: Hi Taylor, Could you be a bit more specific please? I can't find where the wiki describes the trident kafka spout storing offsets or where it's done in the code base. The wiki explains that SpoutConfig (for the KafkaSpout) has the zkRoot property which determines where it stores zookeeper offsets. But TridentKafkaConfig (for the TransactionalTridentKafkaSpout and OpaqueTridentKafkaSpout) does not have this property. Thanks, Josh On Fri, May 1, 2015 at 12:27 PM, P. Taylor Goetz <[email protected]<mailto:[email protected]>> wrote: Hi Josh, The trident kafka spout stores offsets in zookeeper as well. See: https://github.com/apache/storm/tree/master/external/storm-kafka -Taylor On May 1, 2015, at 5:00 AM, Josh <[email protected]<mailto:[email protected]>> wrote: Looks like this was some kind of network issue... I ran the topology on another box and don't have the problem any more. I have a related question about the TransactionalTridentKafkaSpout and OpaqueTridentKafkaSpout though: does anyone know where these spouts store their partition offsets to use for recovery in the case of failure? I know the normal (non-trident) Kafka spout stores its offsets in Zookeeper, and it has a spout config setting ZkHosts which determines where they are stored. But the trident Kafka spouts don't have this setting. Looking at the source code I can't find any place where it is persisting offsets. Plus it's using the SimpleConsumer so it's not relying on Kafka persisting its offsets. On Wed, Apr 29, 2015 at 4:29 PM, Josh <[email protected]<mailto:[email protected]>> wrote: Hi all, I'm using the OpaqueTridentKafkaSpout to read batches of tuples from Kafka, I'm doing some filtering and aggregation and then calling persistAggregate to maintain a map state: stream .each(new Fields("msg"), new MyFilter()) .each(new Fields("msg"), new ExtractFieldsFunction(), new Fields("id")) .groupBy(new Fields("id")) .persistentAggregate(MyMapState.getNonTransactional, new CountAggregator(), new Fields("count")) .parallelismHint(1) It works fine, for the first batch, but then I am having a very strange problem where after the first batch my map state is no longer called. (i.e. there a call to multiGet and multiPut for the first batch only). The spout is still providing tuples, as when I debug I can see that the filter and function both continue to process input tuples (indefinitely). But the map state never gets called again! Why would this happen? I found a couple of very similar threads to this, but they have gone unanswered: https://groups.google.com/forum/#!searchin/storm-user/trident$20state/storm-user/wJbKbHDxRqI/KCiBE5lTdGEJ https://groups.google.com/forum/#!searchin/storm-user/trident$20state/storm-user/OlYUoCjx-uU/99XdNkdaSnIJ Thanks for any help on this, Josh
