Thanks Danijel for your quick proposition. I tried lowering down and removing all performance settings (those were left from load testing on one machine)
Still same result: no matter what, new messages are not taken from kafka after topology is redeployed. On Tue, Jul 8, 2014 at 6:15 PM, Danijel Schiavuzzi <[email protected]> wrote: > Try lowering setMaxSpoutPending(100000) to a much lower value (like 10). > In Trident, setMaxSpoutPending referns to the number of batches, not tuples > like in plain Storm. Too high values may cause blockages like the one you > describe. > > > On Tuesday, July 8, 2014, Miloš Solujić <[email protected]> wrote: > >> Hi all, >> >> I'm pretty new to storm and kafka/zookeeper, and I hope that my question >> is not to dumb. Here it goes: >> >> I'm using latest stable storm and storm-kafka = 0.9.2-incubating >> >> I've setup test cluster using wirbelsturm tool with unchanged yaml (just >> uncommented kafka machine) >> >> here is config snippet for my trident topology: >> >> BrokerHosts zk = new ZkHosts("zookeeper1"); >> TridentKafkaConfig kafkaConf = new TridentKafkaConfig(zk, >> "scores"); >> >> kafkaConf.scheme = new SchemeAsMultiScheme(new StringScheme()); >> kafkaConf.fetchSizeBytes = 10000; >> kafkaConf.forceFromStart = true; >> >> Config stormConfig = new Config(); >> stormConfig.put(Config.NIMBUS_HOST, NIMBUS_IP); >> stormConfig.put("couchbase.ip", COUCHBASE_CONSOLE); >> stormConfig.put("couchbase.bucket", COUCHBASE_BUCKET); >> stormConfig.put("couchbase.password", COUCHBASE_PASSWORD); >> // performance settings >> >> stormConfig.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS, 100); >> stormConfig.put(RichSpoutBatchExecutor.MAX_BATCH_SIZE_CONF, 100); >> stormConfig.setMaxSpoutPending(100000); >> >> >> if (args != null && args.length > 0) { >> >> StormSubmitter.submitTopologyWithProgressBar(args[0], >> stormConfig, >> BuildTridentScoreTopology.build(kafkaConf)); >> } else {...} >> >> Now, I've created 'scores' topic in kafka and pushed few test messages >> prior to starting topology, with kafkaConf.forceFromStart = true. And >> topology processed those messages just fine, and stored them in >> tridentState (couhbase) >> >> All new messages are simply ignored! >> >> After redeploying topology (both with forceFromStart = true and >> forceFromStart = false) no more messages are ingested from kafka. >> >> here is worker log for one topology deployment and short run >> http://pastie.org/private/4xsk6pijvmulwrcg7zgca >> >> those are VMs that host this storm cluster >> 10.0.0.241 zookeeper1 >> 10.0.0.101 supervisor1 >> 10.0.0.21 kafka1 >> 10.0.0.251 nimbus1 >> >> Thanks, >> Milos >> >> >> >> >> >> >> >> > > -- > Danijel Schiavuzzi > > E: [email protected] > W: www.schiavuzzi.com > T: +385989035562 > Skype: danijels7 >
