Are you sure you are producing new messages into the same Kafka topic? What
number did you set maxSpoutPending to?

On Tuesday, July 8, 2014, Miloš Solujić <[email protected]> wrote:

> Thanks Danijel for your quick proposition.
>
> I tried lowering down and removing all performance settings (those were
> left from load testing on one machine)
>
> Still same result: no matter what, new messages are not taken from kafka
> after topology is redeployed.
>
>
> On Tue, Jul 8, 2014 at 6:15 PM, Danijel Schiavuzzi <[email protected]
> <javascript:_e(%7B%7D,'cvml','[email protected]');>> wrote:
>
>> Try lowering setMaxSpoutPending(100000) to a much lower value (like 10).
>> In Trident, setMaxSpoutPending referns to the number of batches, not tuples
>> like in plain Storm. Too high values may cause blockages like the one you
>> describe.
>>
>>
>> On Tuesday, July 8, 2014, Miloš Solujić <[email protected]
>> <javascript:_e(%7B%7D,'cvml','[email protected]');>> wrote:
>>
>>> Hi all,
>>>
>>> I'm pretty new to storm and kafka/zookeeper, and I hope that my question
>>> is not to dumb. Here it goes:
>>>
>>> I'm using latest stable storm and storm-kafka = 0.9.2-incubating
>>>
>>> I've setup test cluster using wirbelsturm tool with unchanged yaml (just
>>> uncommented kafka machine)
>>>
>>> here is config snippet for my trident topology:
>>>
>>>         BrokerHosts zk = new ZkHosts("zookeeper1");
>>>         TridentKafkaConfig kafkaConf = new TridentKafkaConfig(zk,
>>> "scores");
>>>
>>>         kafkaConf.scheme = new SchemeAsMultiScheme(new StringScheme());
>>>         kafkaConf.fetchSizeBytes = 10000;
>>>         kafkaConf.forceFromStart = true;
>>>
>>>         Config stormConfig = new Config();
>>>         stormConfig.put(Config.NIMBUS_HOST, NIMBUS_IP);
>>>         stormConfig.put("couchbase.ip", COUCHBASE_CONSOLE);
>>>         stormConfig.put("couchbase.bucket", COUCHBASE_BUCKET);
>>>         stormConfig.put("couchbase.password", COUCHBASE_PASSWORD);
>>>         // performance settings
>>>
>>> stormConfig.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS, 100);
>>>         stormConfig.put(RichSpoutBatchExecutor.MAX_BATCH_SIZE_CONF, 100);
>>>         stormConfig.setMaxSpoutPending(100000);
>>>
>>>
>>>         if (args != null && args.length > 0) {
>>>
>>>             StormSubmitter.submitTopologyWithProgressBar(args[0],
>>> stormConfig,
>>>                     BuildTridentScoreTopology.build(kafkaConf));
>>>         } else {...}
>>>
>>> Now, I've created 'scores' topic in kafka and pushed few test messages
>>> prior to starting topology, with kafkaConf.forceFromStart = true. And
>>> topology processed those messages just fine, and stored them in
>>> tridentState (couhbase)
>>>
>>> All new messages are simply ignored!
>>>
>>> After redeploying topology (both with forceFromStart = true and
>>> forceFromStart = false) no more messages are ingested from kafka.
>>>
>>> here is worker log for one topology deployment and short run
>>> http://pastie.org/private/4xsk6pijvmulwrcg7zgca
>>>
>>> those are VMs that host this storm cluster
>>> 10.0.0.241 zookeeper1
>>> 10.0.0.101 supervisor1
>>> 10.0.0.21 kafka1
>>> 10.0.0.251 nimbus1
>>>
>>> Thanks,
>>> Milos
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>
>> --
>> Danijel Schiavuzzi
>>
>> E: [email protected]
>> <javascript:_e(%7B%7D,'cvml','[email protected]');>
>> W: www.schiavuzzi.com
>> T: +385989035562
>> Skype: danijels7
>>
>
>

-- 
Danijel Schiavuzzi

E: [email protected]
W: www.schiavuzzi.com
T: +385989035562
Skype: danijels7

Reply via email to