Hi Nikhil,
Thanks for the reply. OK that makes sense, I was trying to figure out where
trident is storing stuff in zookeeper - turns out when I was using
LocalCluster it was starting its own local Zookeeper instance on port 2000.
But I've figured out how to configure it now: by setting the
storm.zookeeper.servers property in storm.yaml.
I can see it's storing batch IDs under: /transactional/{spout_name} and
like you said deleting the data under the /transactional root has fixed the
exception.
Just wondering, how do the batch IDs translate to Kafka offsets? Is there a
way to find out which Kafka offset trident has consumed up to, by looking
in Zookeeper?
Thanks,
Josh
On Wed, May 6, 2015 at 2:09 PM, Nikhil Singh <[email protected]> wrote:
> Hi Josh,
> I think you need to clear the Kafka spout's metadata from the trident
> zookeeper. It has old metadata stored and the offset it has stored is no
> longer present in the Kafka queues.
>
> I think you can configure it to ignore metadata and always read from the
> head or tail..
>
> -Nikhil
>
>
>
>
> On Wednesday, May 6, 2015 5:24 AM, Josh <[email protected]> wrote:
>
>
> I still don't have a good understanding of how/where offsets are stored
> when using the trident spouts and how to configure it.
>
> I'm getting this error at the moment (the topology was running fine a
> couple of days ago...) related to offsets, and not sure how to investigate
> it:
>
> *2015-05-06T12:09:20.414+0200 s.k.KafkaUtils [WARN] Got fetch request with
> offset out of range: [708429188]; retrying with default start offset time
> from configuration.*
> * configured start offset time: [-2]*
> *2015-05-06T12:09:20.415+0200 b.s.util [ERROR] Async loop died!*
>
>
> I'm using storm 0.9.4 with the OpaqueTrdientKafkaSpout
>
> Any ideas how I can resolve this? :)
>
>
> On Fri, May 1, 2015 at 2:05 PM, Josh <[email protected]> wrote:
>
> Hi Taylor,
>
> Could you be a bit more specific please? I can't find where the wiki
> describes the trident kafka spout storing offsets or where it's done in the
> code base.
>
> The wiki explains that SpoutConfig (for the KafkaSpout) has the zkRoot
> property which determines where it stores zookeeper offsets. But
> TridentKafkaConfig (for the TransactionalTridentKafkaSpout and
> OpaqueTridentKafkaSpout) does not have this property.
>
> Thanks,
> Josh
>
> On Fri, May 1, 2015 at 12:27 PM, P. Taylor Goetz <[email protected]>
> wrote:
>
> Hi Josh,
>
> The trident kafka spout stores offsets in zookeeper as well. See:
>
>
> https://github.com/apache/storm/tree/master/external/storm-kafka
>
> -Taylor
>
> On May 1, 2015, at 5:00 AM, Josh <[email protected]> wrote:
>
> Looks like this was some kind of network issue... I ran the topology on
> another box and don't have the problem any more.
>
> I have a related question about the TransactionalTridentKafkaSpout and
> OpaqueTridentKafkaSpout though: does anyone know where these spouts store
> their partition offsets to use for recovery in the case of failure?
>
> I know the normal (non-trident) Kafka spout stores its offsets in
> Zookeeper, and it has a spout config setting ZkHosts which determines where
> they are stored. But the trident Kafka spouts don't have this setting.
> Looking at the source code I can't find any place where it is persisting
> offsets. Plus it's using the SimpleConsumer so it's not relying on Kafka
> persisting its offsets.
>
> On Wed, Apr 29, 2015 at 4:29 PM, Josh <[email protected]> wrote:
>
> Hi all,
>
> I'm using the OpaqueTridentKafkaSpout to read batches of tuples from
> Kafka, I'm doing some filtering and aggregation and then calling
> persistAggregate to maintain a map state:
>
> stream
> .each(new Fields("msg"), new MyFilter())
> .each(new Fields("msg"), new ExtractFieldsFunction(), new
> Fields("id"))
> .groupBy(new Fields("id"))
> .persistentAggregate(MyMapState.getNonTransactional, new
> CountAggregator(), new Fields("count"))
> .parallelismHint(1)
>
> It works fine, for the first batch, but then I am having a very strange
> problem where after the first batch my map state is no longer called. (i.e.
> there a call to multiGet and multiPut for the first batch only).
>
> The spout is still providing tuples, as when I debug I can see that the
> filter and function both continue to process input tuples (indefinitely).
> But the map state never gets called again!
>
> Why would this happen?
>
> I found a couple of very similar threads to this, but they have gone
> unanswered:
>
> https://groups.google.com/forum/#!searchin/storm-user/trident$20state/storm-user/wJbKbHDxRqI/KCiBE5lTdGEJ
>
> https://groups.google.com/forum/#!searchin/storm-user/trident$20state/storm-user/OlYUoCjx-uU/99XdNkdaSnIJ
>
>
> Thanks for any help on this,
>
> Josh
>
>
>
>
>
>
>