Yes, Setting to head or tail is application specific.
The spout should eventually catchup, except in one case. If offset defaults to
-2 (which is the tail) and the data is put in kakfa at a faster rate than it
can be consumed then this exception for missing offset will keep popping up as
the offset which was stored in the zookeeper for the current transaction will
be missing from the queue by the time the next transaction is processed
(because it will be dropped if the queue is of a fixed size).
-Nikhil
On Wednesday, May 6, 2015 10:36 AM, Jeff Maass <[email protected]> wrote:
Is there a way to find out which Kafka offset trident has consumed up to, by
looking in
Zookeeper?:----------------------------------------------------------------------------------------------------------------------------------------There
is an app, specific to Kafka, which will monitor a consumer’s offset. I would
recommend only using it in testing, as I believe, in my environment, it caused
locking of some sort. Note that it is looking at the offset data stored by
kafka, not the offset data stored by the
spout.https://github.com/quantifind/KafkaOffsetMonitor
Regarding the statement:I think you can configure it to ignore metadata and
always read from the head or tail..--------------------That configuration
should only be done if the logic of your application is ok with * head:
replaying previously processed records* tail: not playing / skipping an unknown
volume of records
I wouldn’t necessarily “reset” the offsets in zookeeper. Shouldn’t the spout
eventually catch up to the Kafka partitions / find a valid / existing
offset?Like this:try 3, exceptiontry 4, exceptiontry 5, exceptiontry 6, returns
valid data
From: Josh <[email protected]>
Reply-To: "[email protected]" <[email protected]>
Date: 2015,Wednesday, May 6 at 09:43
To: "[email protected]" <[email protected]>, Nikhil Singh
<[email protected]>
Subject: Re: Trident persistentAggregate only working for the first batch!
Hi Nikhil,
Thanks for the reply. OK that makes sense, I was trying to figure out where
trident is storing stuff in zookeeper - turns out when I was using LocalCluster
it was starting its own local Zookeeper instance on port 2000. But I've figured
out how to configure it now: by setting the storm.zookeeper.servers property in
storm.yaml.
I can see it's storing batch IDs under: /transactional/{spout_name} and like
you said deleting the data under the /transactional root has fixed the
exception.
Just wondering, how do the batch IDs translate to Kafka offsets? Is there a way
to find out which Kafka offset trident has consumed up to, by looking in
Zookeeper?
Thanks,Josh
On Wed, May 6, 2015 at 2:09 PM, Nikhil Singh <[email protected]> wrote:
Hi Josh,I think you need to clear the Kafka spout's metadata from the trident
zookeeper. It has old metadata stored and the offset it has stored is no longer
present in the Kafka queues.
I think you can configure it to ignore metadata and always read from the head
or tail..
-Nikhil
On Wednesday, May 6, 2015 5:24 AM, Josh <[email protected]> wrote:
I still don't have a good understanding of how/where offsets are stored when
using the trident spouts and how to configure it.
I'm getting this error at the moment (the topology was running fine a couple of
days ago...) related to offsets, and not sure how to investigate it:
2015-05-06T12:09:20.414+0200 s.k.KafkaUtils [WARN] Got fetch request with
offset out of range: [708429188]; retrying with default start offset time from
configuration.
configured start offset time: [-2]2015-05-06T12:09:20.415+0200 b.s.util
[ERROR] Async loop died!
I'm using storm 0.9.4 with the OpaqueTrdientKafkaSpout
Any ideas how I can resolve this? :)
On Fri, May 1, 2015 at 2:05 PM, Josh <[email protected]> wrote:
Hi Taylor,
Could you be a bit more specific please? I can't find where the wiki describes
the trident kafka spout storing offsets or where it's done in the code base.
The wiki explains that SpoutConfig (for the KafkaSpout) has the zkRoot property
which determines where it stores zookeeper offsets. But TridentKafkaConfig (for
the TransactionalTridentKafkaSpout and OpaqueTridentKafkaSpout) does not have
this property.
Thanks,Josh
On Fri, May 1, 2015 at 12:27 PM, P. Taylor Goetz <[email protected]> wrote:
Hi Josh,
The trident kafka spout stores offsets in zookeeper as well. See:
https://github.com/apache/storm/tree/master/external/storm-kafka
-Taylor
On May 1, 2015, at 5:00 AM, Josh <[email protected]> wrote:
Looks like this was some kind of network issue... I ran the topology on another
box and don't have the problem any more.
I have a related question about the TransactionalTridentKafkaSpout and
OpaqueTridentKafkaSpout though: does anyone know where these spouts store their
partition offsets to use for recovery in the case of failure?
I know the normal (non-trident) Kafka spout stores its offsets in Zookeeper,
and it has a spout config setting ZkHosts which determines where they are
stored. But the trident Kafka spouts don't have this setting. Looking at the
source code I can't find any place where it is persisting offsets. Plus it's
using the SimpleConsumer so it's not relying on Kafka persisting its offsets.
On Wed, Apr 29, 2015 at 4:29 PM, Josh <[email protected]> wrote:
Hi all,
I'm using the OpaqueTridentKafkaSpout to read batches of tuples from Kafka, I'm
doing some filtering and aggregation and then calling persistAggregate to
maintain a map state:
stream .each(new Fields("msg"), new MyFilter()) .each(new
Fields("msg"), new ExtractFieldsFunction(), new Fields("id")) .groupBy(new
Fields("id")) .persistentAggregate(MyMapState.getNonTransactional, new
CountAggregator(), new Fields("count")) .parallelismHint(1)
It works fine, for the first batch, but then I am having a very strange problem
where after the first batch my map state is no longer called. (i.e. there a
call to multiGet and multiPut for the first batch only).
The spout is still providing tuples, as when I debug I can see that the filter
and function both continue to process input tuples (indefinitely). But the map
state never gets called again!
Why would this happen?
I found a couple of very similar threads to this, but they have gone
unanswered:https://groups.google.com/forum/#!searchin/storm-user/trident$20state/storm-user/wJbKbHDxRqI/KCiBE5lTdGEJhttps://groups.google.com/forum/#!searchin/storm-user/trident$20state/storm-user/OlYUoCjx-uU/99XdNkdaSnIJ
Thanks for any help on this,
Josh