I still don't have a good understanding of how/where offsets are stored
when using the trident spouts and how to configure it.

I'm getting this error at the moment (the topology was running fine a
couple of days ago...) related to offsets, and not sure how to investigate
it:

*2015-05-06T12:09:20.414+0200 s.k.KafkaUtils [WARN] Got fetch request with
offset out of range: [708429188]; retrying with default start offset time
from configuration.*
* configured start offset time: [-2]*
*2015-05-06T12:09:20.415+0200 b.s.util [ERROR] Async loop died!*


I'm using storm 0.9.4 with the OpaqueTrdientKafkaSpout

Any ideas how I can resolve this? :)


On Fri, May 1, 2015 at 2:05 PM, Josh <[email protected]> wrote:

> Hi Taylor,
>
> Could you be a bit more specific please? I can't find where the wiki
> describes the trident kafka spout storing offsets or where it's done in the
> code base.
>
> The wiki explains that SpoutConfig (for the KafkaSpout) has the zkRoot
> property which determines where it stores zookeeper offsets. But
> TridentKafkaConfig (for the TransactionalTridentKafkaSpout and
> OpaqueTridentKafkaSpout) does not have this property.
>
> Thanks,
> Josh
>
> On Fri, May 1, 2015 at 12:27 PM, P. Taylor Goetz <[email protected]>
> wrote:
>
>> Hi Josh,
>>
>> The trident kafka spout stores offsets in zookeeper as well. See:
>>
>>
>> https://github.com/apache/storm/tree/master/external/storm-kafka
>>
>> -Taylor
>>
>> On May 1, 2015, at 5:00 AM, Josh <[email protected]> wrote:
>>
>> Looks like this was some kind of network issue... I ran the topology on
>> another box and don't have the problem any more.
>>
>> I have a related question about the TransactionalTridentKafkaSpout and
>> OpaqueTridentKafkaSpout though: does anyone know where these spouts store
>> their partition offsets to use for recovery in the case of failure?
>>
>> I know the normal (non-trident) Kafka spout stores its offsets in
>> Zookeeper, and it has a spout config setting ZkHosts which determines where
>> they are stored. But the trident Kafka spouts don't have this setting.
>> Looking at the source code I can't find any place where it is persisting
>> offsets. Plus it's using the SimpleConsumer so it's not relying on Kafka
>> persisting its offsets.
>>
>> On Wed, Apr 29, 2015 at 4:29 PM, Josh <[email protected]> wrote:
>>
>>> Hi all,
>>>
>>> I'm using the OpaqueTridentKafkaSpout to read batches of tuples from
>>> Kafka, I'm doing some filtering and aggregation and then calling
>>> persistAggregate to maintain a map state:
>>>
>>> stream
>>>       .each(new Fields("msg"), new MyFilter())
>>>       .each(new Fields("msg"), new ExtractFieldsFunction(), new
>>> Fields("id"))
>>>       .groupBy(new Fields("id"))
>>>       .persistentAggregate(MyMapState.getNonTransactional, new
>>> CountAggregator(), new Fields("count"))
>>>       .parallelismHint(1)
>>>
>>> It works fine, for the first batch, but then I am having a very strange
>>> problem where after the first batch my map state is no longer called. (i.e.
>>> there a call to multiGet and multiPut for the first batch only).
>>>
>>> The spout is still providing tuples, as when I debug I can see that the
>>> filter and function both continue to process input tuples (indefinitely).
>>> But the map state never gets called again!
>>>
>>> Why would this happen?
>>>
>>> I found a couple of very similar threads to this, but they have gone
>>> unanswered:
>>>
>>> https://groups.google.com/forum/#!searchin/storm-user/trident$20state/storm-user/wJbKbHDxRqI/KCiBE5lTdGEJ
>>>
>>> https://groups.google.com/forum/#!searchin/storm-user/trident$20state/storm-user/OlYUoCjx-uU/99XdNkdaSnIJ
>>>
>>>
>>> Thanks for any help on this,
>>>
>>> Josh
>>>
>>
>>
>

Reply via email to