Are you sure you are producing new messages into the same Kafka topic? What number did you set maxSpoutPending to?
On Tuesday, July 8, 2014, Miloš Solujić <[email protected]> wrote: > Thanks Danijel for your quick proposition. > > I tried lowering down and removing all performance settings (those were > left from load testing on one machine) > > Still same result: no matter what, new messages are not taken from kafka > after topology is redeployed. > > > On Tue, Jul 8, 2014 at 6:15 PM, Danijel Schiavuzzi <[email protected] > <javascript:_e(%7B%7D,'cvml','[email protected]');>> wrote: > >> Try lowering setMaxSpoutPending(100000) to a much lower value (like 10). >> In Trident, setMaxSpoutPending referns to the number of batches, not tuples >> like in plain Storm. Too high values may cause blockages like the one you >> describe. >> >> >> On Tuesday, July 8, 2014, Miloš Solujić <[email protected] >> <javascript:_e(%7B%7D,'cvml','[email protected]');>> wrote: >> >>> Hi all, >>> >>> I'm pretty new to storm and kafka/zookeeper, and I hope that my question >>> is not to dumb. Here it goes: >>> >>> I'm using latest stable storm and storm-kafka = 0.9.2-incubating >>> >>> I've setup test cluster using wirbelsturm tool with unchanged yaml (just >>> uncommented kafka machine) >>> >>> here is config snippet for my trident topology: >>> >>> BrokerHosts zk = new ZkHosts("zookeeper1"); >>> TridentKafkaConfig kafkaConf = new TridentKafkaConfig(zk, >>> "scores"); >>> >>> kafkaConf.scheme = new SchemeAsMultiScheme(new StringScheme()); >>> kafkaConf.fetchSizeBytes = 10000; >>> kafkaConf.forceFromStart = true; >>> >>> Config stormConfig = new Config(); >>> stormConfig.put(Config.NIMBUS_HOST, NIMBUS_IP); >>> stormConfig.put("couchbase.ip", COUCHBASE_CONSOLE); >>> stormConfig.put("couchbase.bucket", COUCHBASE_BUCKET); >>> stormConfig.put("couchbase.password", COUCHBASE_PASSWORD); >>> // performance settings >>> >>> stormConfig.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS, 100); >>> stormConfig.put(RichSpoutBatchExecutor.MAX_BATCH_SIZE_CONF, 100); >>> stormConfig.setMaxSpoutPending(100000); >>> >>> >>> if (args != null && args.length > 0) { >>> >>> StormSubmitter.submitTopologyWithProgressBar(args[0], >>> stormConfig, >>> BuildTridentScoreTopology.build(kafkaConf)); >>> } else {...} >>> >>> Now, I've created 'scores' topic in kafka and pushed few test messages >>> prior to starting topology, with kafkaConf.forceFromStart = true. And >>> topology processed those messages just fine, and stored them in >>> tridentState (couhbase) >>> >>> All new messages are simply ignored! >>> >>> After redeploying topology (both with forceFromStart = true and >>> forceFromStart = false) no more messages are ingested from kafka. >>> >>> here is worker log for one topology deployment and short run >>> http://pastie.org/private/4xsk6pijvmulwrcg7zgca >>> >>> those are VMs that host this storm cluster >>> 10.0.0.241 zookeeper1 >>> 10.0.0.101 supervisor1 >>> 10.0.0.21 kafka1 >>> 10.0.0.251 nimbus1 >>> >>> Thanks, >>> Milos >>> >>> >>> >>> >>> >>> >>> >>> >> >> -- >> Danijel Schiavuzzi >> >> E: [email protected] >> <javascript:_e(%7B%7D,'cvml','[email protected]');> >> W: www.schiavuzzi.com >> T: +385989035562 >> Skype: danijels7 >> > > -- Danijel Schiavuzzi E: [email protected] W: www.schiavuzzi.com T: +385989035562 Skype: danijels7
