I'd double check the Kafka producer to make sure those messages are really getting into the right Kafka topic. Also, try enabling Config.setDebug(true) and monitoring the Kafka spout's activity in the logs. setMaxSpoutPending should always be set, as by default it is unset, so you risk internal queue explosion.
On Tuesday, July 8, 2014, Miloš Solujić <[email protected]> wrote: > Yep. pretty much sure. Via internal kafka-producer.sh > same method is used to produce initial messages (before first launch of > topology, that got consumed and processed just fine) > > as for maxSpoutPending first I tried with 10, than removed it (left > default value) > > > On Tue, Jul 8, 2014 at 6:31 PM, Danijel Schiavuzzi <[email protected] > <javascript:_e(%7B%7D,'cvml','[email protected]');>> wrote: > >> Are you sure you are producing new messages into the same Kafka >> topic? What number did you set maxSpoutPending to? >> >> On Tuesday, July 8, 2014, Miloš Solujić <[email protected] >> <javascript:_e(%7B%7D,'cvml','[email protected]');>> wrote: >> >>> Thanks Danijel for your quick proposition. >>> >>> I tried lowering down and removing all performance settings (those were >>> left from load testing on one machine) >>> >>> Still same result: no matter what, new messages are not taken from kafka >>> after topology is redeployed. >>> >>> >>> On Tue, Jul 8, 2014 at 6:15 PM, Danijel Schiavuzzi < >>> [email protected]> wrote: >>> >>>> Try lowering setMaxSpoutPending(100000) to a much lower value (like >>>> 10). In Trident, setMaxSpoutPending referns to the number of batches, not >>>> tuples like in plain Storm. Too high values may cause blockages like the >>>> one you describe. >>>> >>>> >>>> On Tuesday, July 8, 2014, Miloš Solujić <[email protected]> >>>> wrote: >>>> >>>>> Hi all, >>>>> >>>>> I'm pretty new to storm and kafka/zookeeper, and I hope that my >>>>> question is not to dumb. Here it goes: >>>>> >>>>> I'm using latest stable storm and storm-kafka = 0.9.2-incubating >>>>> >>>>> I've setup test cluster using wirbelsturm tool with unchanged yaml >>>>> (just uncommented kafka machine) >>>>> >>>>> here is config snippet for my trident topology: >>>>> >>>>> BrokerHosts zk = new ZkHosts("zookeeper1"); >>>>> TridentKafkaConfig kafkaConf = new TridentKafkaConfig(zk, >>>>> "scores"); >>>>> >>>>> kafkaConf.scheme = new SchemeAsMultiScheme(new StringScheme()); >>>>> kafkaConf.fetchSizeBytes = 10000; >>>>> kafkaConf.forceFromStart = true; >>>>> >>>>> Config stormConfig = new Config(); >>>>> stormConfig.put(Config.NIMBUS_HOST, NIMBUS_IP); >>>>> stormConfig.put("couchbase.ip", COUCHBASE_CONSOLE); >>>>> stormConfig.put("couchbase.bucket", COUCHBASE_BUCKET); >>>>> stormConfig.put("couchbase.password", COUCHBASE_PASSWORD); >>>>> // performance settings >>>>> >>>>> stormConfig.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS, 100); >>>>> stormConfig.put(RichSpoutBatchExecutor.MAX_BATCH_SIZE_CONF, >>>>> 100); >>>>> stormConfig.setMaxSpoutPending(100000); >>>>> >>>>> >>>>> if (args != null && args.length > 0) { >>>>> >>>>> StormSubmitter.submitTopologyWithProgressBar(args[0], >>>>> stormConfig, >>>>> BuildTridentScoreTopology.build(kafkaConf)); >>>>> } else {...} >>>>> >>>>> Now, I've created 'scores' topic in kafka and pushed few test messages >>>>> prior to starting topology, with kafkaConf.forceFromStart = true. And >>>>> topology processed those messages just fine, and stored them in >>>>> tridentState (couhbase) >>>>> >>>>> All new messages are simply ignored! >>>>> >>>>> After redeploying topology (both with forceFromStart = true and >>>>> forceFromStart = false) no more messages are ingested from kafka. >>>>> >>>>> here is worker log for one topology deployment and short run >>>>> http://pastie.org/private/4xsk6pijvmulwrcg7zgca >>>>> >>>>> those are VMs that host this storm cluster >>>>> 10.0.0.241 zookeeper1 >>>>> 10.0.0.101 supervisor1 >>>>> 10.0.0.21 kafka1 >>>>> 10.0.0.251 nimbus1 >>>>> >>>>> Thanks, >>>>> Milos >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>> >>>> -- >>>> Danijel Schiavuzzi >>>> >>>> E: [email protected] >>>> W: www.schiavuzzi.com >>>> T: +385989035562 >>>> Skype: danijels7 >>>> >>> >>> >> >> -- >> Danijel Schiavuzzi >> >> E: [email protected] >> <javascript:_e(%7B%7D,'cvml','[email protected]');> >> W: www.schiavuzzi.com >> T: +385989035562 >> Skype: danijels7 >> > > -- Danijel Schiavuzzi E: [email protected] W: www.schiavuzzi.com T: +385989035562 Skype: danijels7
