Try lowering setMaxSpoutPending(100000) to a much lower value (like 10). In
Trident, setMaxSpoutPending referns to the number of batches, not tuples
like in plain Storm. Too high values may cause blockages like the one you
describe.

On Tuesday, July 8, 2014, Miloš Solujić <[email protected]> wrote:

> Hi all,
>
> I'm pretty new to storm and kafka/zookeeper, and I hope that my question
> is not to dumb. Here it goes:
>
> I'm using latest stable storm and storm-kafka = 0.9.2-incubating
>
> I've setup test cluster using wirbelsturm tool with unchanged yaml (just
> uncommented kafka machine)
>
> here is config snippet for my trident topology:
>
>         BrokerHosts zk = new ZkHosts("zookeeper1");
>         TridentKafkaConfig kafkaConf = new TridentKafkaConfig(zk,
> "scores");
>
>         kafkaConf.scheme = new SchemeAsMultiScheme(new StringScheme());
>         kafkaConf.fetchSizeBytes = 10000;
>         kafkaConf.forceFromStart = true;
>
>         Config stormConfig = new Config();
>         stormConfig.put(Config.NIMBUS_HOST, NIMBUS_IP);
>         stormConfig.put("couchbase.ip", COUCHBASE_CONSOLE);
>         stormConfig.put("couchbase.bucket", COUCHBASE_BUCKET);
>         stormConfig.put("couchbase.password", COUCHBASE_PASSWORD);
>         // performance settings
>
> stormConfig.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS, 100);
>         stormConfig.put(RichSpoutBatchExecutor.MAX_BATCH_SIZE_CONF, 100);
>         stormConfig.setMaxSpoutPending(100000);
>
>
>         if (args != null && args.length > 0) {
>
>             StormSubmitter.submitTopologyWithProgressBar(args[0],
> stormConfig,
>                     BuildTridentScoreTopology.build(kafkaConf));
>         } else {...}
>
> Now, I've created 'scores' topic in kafka and pushed few test messages
> prior to starting topology, with kafkaConf.forceFromStart = true. And
> topology processed those messages just fine, and stored them in
> tridentState (couhbase)
>
> All new messages are simply ignored!
>
> After redeploying topology (both with forceFromStart = true and
> forceFromStart = false) no more messages are ingested from kafka.
>
> here is worker log for one topology deployment and short run
> http://pastie.org/private/4xsk6pijvmulwrcg7zgca
>
> those are VMs that host this storm cluster
> 10.0.0.241 zookeeper1
> 10.0.0.101 supervisor1
> 10.0.0.21 kafka1
> 10.0.0.251 nimbus1
>
> Thanks,
> Milos
>
>
>
>
>
>
>
>

-- 
Danijel Schiavuzzi

E: [email protected]
W: www.schiavuzzi.com
T: +385989035562
Skype: danijels7

Reply via email to