Hi Taylor,

Could you be a bit more specific please? I can't find where the wiki
describes the trident kafka spout storing offsets or where it's done in the
code base.

The wiki explains that SpoutConfig (for the KafkaSpout) has the zkRoot
property which determines where it stores zookeeper offsets. But
TridentKafkaConfig (for the TransactionalTridentKafkaSpout and
OpaqueTridentKafkaSpout) does not have this property.

Thanks,
Josh

On Fri, May 1, 2015 at 12:27 PM, P. Taylor Goetz <[email protected]> wrote:

> Hi Josh,
>
> The trident kafka spout stores offsets in zookeeper as well. See:
>
>
> https://github.com/apache/storm/tree/master/external/storm-kafka
>
> -Taylor
>
> On May 1, 2015, at 5:00 AM, Josh <[email protected]> wrote:
>
> Looks like this was some kind of network issue... I ran the topology on
> another box and don't have the problem any more.
>
> I have a related question about the TransactionalTridentKafkaSpout and
> OpaqueTridentKafkaSpout though: does anyone know where these spouts store
> their partition offsets to use for recovery in the case of failure?
>
> I know the normal (non-trident) Kafka spout stores its offsets in
> Zookeeper, and it has a spout config setting ZkHosts which determines where
> they are stored. But the trident Kafka spouts don't have this setting.
> Looking at the source code I can't find any place where it is persisting
> offsets. Plus it's using the SimpleConsumer so it's not relying on Kafka
> persisting its offsets.
>
> On Wed, Apr 29, 2015 at 4:29 PM, Josh <[email protected]> wrote:
>
>> Hi all,
>>
>> I'm using the OpaqueTridentKafkaSpout to read batches of tuples from
>> Kafka, I'm doing some filtering and aggregation and then calling
>> persistAggregate to maintain a map state:
>>
>> stream
>>       .each(new Fields("msg"), new MyFilter())
>>       .each(new Fields("msg"), new ExtractFieldsFunction(), new
>> Fields("id"))
>>       .groupBy(new Fields("id"))
>>       .persistentAggregate(MyMapState.getNonTransactional, new
>> CountAggregator(), new Fields("count"))
>>       .parallelismHint(1)
>>
>> It works fine, for the first batch, but then I am having a very strange
>> problem where after the first batch my map state is no longer called. (i.e.
>> there a call to multiGet and multiPut for the first batch only).
>>
>> The spout is still providing tuples, as when I debug I can see that the
>> filter and function both continue to process input tuples (indefinitely).
>> But the map state never gets called again!
>>
>> Why would this happen?
>>
>> I found a couple of very similar threads to this, but they have gone
>> unanswered:
>>
>> https://groups.google.com/forum/#!searchin/storm-user/trident$20state/storm-user/wJbKbHDxRqI/KCiBE5lTdGEJ
>>
>> https://groups.google.com/forum/#!searchin/storm-user/trident$20state/storm-user/OlYUoCjx-uU/99XdNkdaSnIJ
>>
>>
>> Thanks for any help on this,
>>
>> Josh
>>
>
>

Reply via email to