Sorry I meant transactional.zookeeper.servers in storm.yaml!
On Wed, May 6, 2015 at 3:43 PM, Josh <[email protected]> wrote:
> Hi Nikhil,
>
> Thanks for the reply. OK that makes sense, I was trying to figure out
> where trident is storing stuff in zookeeper - turns out when I was using
> LocalCluster it was starting its own local Zookeeper instance on port 2000.
> But I've figured out how to configure it now: by setting the
> storm.zookeeper.servers property in storm.yaml.
>
> I can see it's storing batch IDs under: /transactional/{spout_name} and
> like you said deleting the data under the /transactional root has fixed the
> exception.
>
> Just wondering, how do the batch IDs translate to Kafka offsets? Is there
> a way to find out which Kafka offset trident has consumed up to, by looking
> in Zookeeper?
>
> Thanks,
> Josh
>
>
>
>
> On Wed, May 6, 2015 at 2:09 PM, Nikhil Singh <[email protected]>
> wrote:
>
>> Hi Josh,
>> I think you need to clear the Kafka spout's metadata from the trident
>> zookeeper. It has old metadata stored and the offset it has stored is no
>> longer present in the Kafka queues.
>>
>> I think you can configure it to ignore metadata and always read from the
>> head or tail..
>>
>> -Nikhil
>>
>>
>>
>>
>> On Wednesday, May 6, 2015 5:24 AM, Josh <[email protected]> wrote:
>>
>>
>> I still don't have a good understanding of how/where offsets are stored
>> when using the trident spouts and how to configure it.
>>
>> I'm getting this error at the moment (the topology was running fine a
>> couple of days ago...) related to offsets, and not sure how to investigate
>> it:
>>
>> *2015-05-06T12:09:20.414+0200 s.k.KafkaUtils [WARN] Got fetch request
>> with offset out of range: [708429188]; retrying with default start offset
>> time from configuration.*
>> * configured start offset time: [-2]*
>> *2015-05-06T12:09:20.415+0200 b.s.util [ERROR] Async loop died!*
>>
>>
>> I'm using storm 0.9.4 with the OpaqueTrdientKafkaSpout
>>
>> Any ideas how I can resolve this? :)
>>
>>
>> On Fri, May 1, 2015 at 2:05 PM, Josh <[email protected]> wrote:
>>
>> Hi Taylor,
>>
>> Could you be a bit more specific please? I can't find where the wiki
>> describes the trident kafka spout storing offsets or where it's done in the
>> code base.
>>
>> The wiki explains that SpoutConfig (for the KafkaSpout) has the zkRoot
>> property which determines where it stores zookeeper offsets. But
>> TridentKafkaConfig (for the TransactionalTridentKafkaSpout and
>> OpaqueTridentKafkaSpout) does not have this property.
>>
>> Thanks,
>> Josh
>>
>> On Fri, May 1, 2015 at 12:27 PM, P. Taylor Goetz <[email protected]>
>> wrote:
>>
>> Hi Josh,
>>
>> The trident kafka spout stores offsets in zookeeper as well. See:
>>
>>
>> https://github.com/apache/storm/tree/master/external/storm-kafka
>>
>> -Taylor
>>
>> On May 1, 2015, at 5:00 AM, Josh <[email protected]> wrote:
>>
>> Looks like this was some kind of network issue... I ran the topology on
>> another box and don't have the problem any more.
>>
>> I have a related question about the TransactionalTridentKafkaSpout and
>> OpaqueTridentKafkaSpout though: does anyone know where these spouts store
>> their partition offsets to use for recovery in the case of failure?
>>
>> I know the normal (non-trident) Kafka spout stores its offsets in
>> Zookeeper, and it has a spout config setting ZkHosts which determines where
>> they are stored. But the trident Kafka spouts don't have this setting.
>> Looking at the source code I can't find any place where it is persisting
>> offsets. Plus it's using the SimpleConsumer so it's not relying on Kafka
>> persisting its offsets.
>>
>> On Wed, Apr 29, 2015 at 4:29 PM, Josh <[email protected]> wrote:
>>
>> Hi all,
>>
>> I'm using the OpaqueTridentKafkaSpout to read batches of tuples from
>> Kafka, I'm doing some filtering and aggregation and then calling
>> persistAggregate to maintain a map state:
>>
>> stream
>> .each(new Fields("msg"), new MyFilter())
>> .each(new Fields("msg"), new ExtractFieldsFunction(), new
>> Fields("id"))
>> .groupBy(new Fields("id"))
>> .persistentAggregate(MyMapState.getNonTransactional, new
>> CountAggregator(), new Fields("count"))
>> .parallelismHint(1)
>>
>> It works fine, for the first batch, but then I am having a very strange
>> problem where after the first batch my map state is no longer called. (i.e.
>> there a call to multiGet and multiPut for the first batch only).
>>
>> The spout is still providing tuples, as when I debug I can see that the
>> filter and function both continue to process input tuples (indefinitely).
>> But the map state never gets called again!
>>
>> Why would this happen?
>>
>> I found a couple of very similar threads to this, but they have gone
>> unanswered:
>>
>> https://groups.google.com/forum/#!searchin/storm-user/trident$20state/storm-user/wJbKbHDxRqI/KCiBE5lTdGEJ
>>
>> https://groups.google.com/forum/#!searchin/storm-user/trident$20state/storm-user/OlYUoCjx-uU/99XdNkdaSnIJ
>>
>>
>> Thanks for any help on this,
>>
>> Josh
>>
>>
>>
>>
>>
>>
>>
>