Hi all,
I'm pretty new to storm and kafka/zookeeper, and I hope that my question is
not to dumb. Here it goes:
I'm using latest stable storm and storm-kafka = 0.9.2-incubating
I've setup test cluster using wirbelsturm tool with unchanged yaml (just
uncommented kafka machine)
here is config snippet for my trident topology:
BrokerHosts zk = new ZkHosts("zookeeper1");
TridentKafkaConfig kafkaConf = new TridentKafkaConfig(zk,
"scores");
kafkaConf.scheme = new SchemeAsMultiScheme(new StringScheme());
kafkaConf.fetchSizeBytes = 10000;
kafkaConf.forceFromStart = true;
Config stormConfig = new Config();
stormConfig.put(Config.NIMBUS_HOST, NIMBUS_IP);
stormConfig.put("couchbase.ip", COUCHBASE_CONSOLE);
stormConfig.put("couchbase.bucket", COUCHBASE_BUCKET);
stormConfig.put("couchbase.password", COUCHBASE_PASSWORD);
// performance settings
stormConfig.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS,
100);
stormConfig.put(RichSpoutBatchExecutor.MAX_BATCH_SIZE_CONF, 100);
stormConfig.setMaxSpoutPending(100000);
if (args != null && args.length > 0) {
StormSubmitter.submitTopologyWithProgressBar(args[0],
stormConfig,
BuildTridentScoreTopology.build(kafkaConf));
} else {...}
Now, I've created 'scores' topic in kafka and pushed few test messages
prior to starting topology, with kafkaConf.forceFromStart = true. And
topology processed those messages just fine, and stored them in
tridentState (couhbase)
All new messages are simply ignored!
After redeploying topology (both with forceFromStart = true and
forceFromStart = false) no more messages are ingested from kafka.
here is worker log for one topology deployment and short run
http://pastie.org/private/4xsk6pijvmulwrcg7zgca
those are VMs that host this storm cluster
10.0.0.241 zookeeper1
10.0.0.101 supervisor1
10.0.0.21 kafka1
10.0.0.251 nimbus1
Thanks,
Milos