Hi Josh,I think you need to clear the Kafka spout's metadata from the trident
zookeeper. It has old metadata stored and the offset it has stored is no longer
present in the Kafka queues.
I think you can configure it to ignore metadata and always read from the head
or tail..
-Nikhil
On Wednesday, May 6, 2015 5:24 AM, Josh <[email protected]> wrote:
I still don't have a good understanding of how/where offsets are stored when
using the trident spouts and how to configure it.
I'm getting this error at the moment (the topology was running fine a couple of
days ago...) related to offsets, and not sure how to investigate it:
2015-05-06T12:09:20.414+0200 s.k.KafkaUtils [WARN] Got fetch request with
offset out of range: [708429188]; retrying with default start offset time from
configuration.
configured start offset time: [-2]2015-05-06T12:09:20.415+0200 b.s.util
[ERROR] Async loop died!
I'm using storm 0.9.4 with the OpaqueTrdientKafkaSpout
Any ideas how I can resolve this? :)
On Fri, May 1, 2015 at 2:05 PM, Josh <[email protected]> wrote:
Hi Taylor,
Could you be a bit more specific please? I can't find where the wiki describes
the trident kafka spout storing offsets or where it's done in the code base.
The wiki explains that SpoutConfig (for the KafkaSpout) has the zkRoot property
which determines where it stores zookeeper offsets. But TridentKafkaConfig (for
the TransactionalTridentKafkaSpout and OpaqueTridentKafkaSpout) does not have
this property.
Thanks,Josh
On Fri, May 1, 2015 at 12:27 PM, P. Taylor Goetz <[email protected]> wrote:
Hi Josh,
The trident kafka spout stores offsets in zookeeper as well. See:
https://github.com/apache/storm/tree/master/external/storm-kafka
-Taylor
On May 1, 2015, at 5:00 AM, Josh <[email protected]> wrote:
Looks like this was some kind of network issue... I ran the topology on another
box and don't have the problem any more.
I have a related question about the TransactionalTridentKafkaSpout and
OpaqueTridentKafkaSpout though: does anyone know where these spouts store their
partition offsets to use for recovery in the case of failure?
I know the normal (non-trident) Kafka spout stores its offsets in Zookeeper,
and it has a spout config setting ZkHosts which determines where they are
stored. But the trident Kafka spouts don't have this setting. Looking at the
source code I can't find any place where it is persisting offsets. Plus it's
using the SimpleConsumer so it's not relying on Kafka persisting its offsets.
On Wed, Apr 29, 2015 at 4:29 PM, Josh <[email protected]> wrote:
Hi all,
I'm using the OpaqueTridentKafkaSpout to read batches of tuples from Kafka, I'm
doing some filtering and aggregation and then calling persistAggregate to
maintain a map state:
stream .each(new Fields("msg"), new MyFilter()) .each(new
Fields("msg"), new ExtractFieldsFunction(), new Fields("id")) .groupBy(new
Fields("id")) .persistentAggregate(MyMapState.getNonTransactional, new
CountAggregator(), new Fields("count")) .parallelismHint(1)
It works fine, for the first batch, but then I am having a very strange problem
where after the first batch my map state is no longer called. (i.e. there a
call to multiGet and multiPut for the first batch only).
The spout is still providing tuples, as when I debug I can see that the filter
and function both continue to process input tuples (indefinitely). But the map
state never gets called again!
Why would this happen?
I found a couple of very similar threads to this, but they have gone
unanswered:https://groups.google.com/forum/#!searchin/storm-user/trident$20state/storm-user/wJbKbHDxRqI/KCiBE5lTdGEJhttps://groups.google.com/forum/#!searchin/storm-user/trident$20state/storm-user/OlYUoCjx-uU/99XdNkdaSnIJ
Thanks for any help on this,
Josh