Hi all,

I'm pretty new to storm and kafka/zookeeper, and I hope that my question is
not to dumb. Here it goes:

I'm using latest stable storm and storm-kafka = 0.9.2-incubating

I've setup test cluster using wirbelsturm tool with unchanged yaml (just
uncommented kafka machine)

here is config snippet for my trident topology:

        BrokerHosts zk = new ZkHosts("zookeeper1");
        TridentKafkaConfig kafkaConf = new TridentKafkaConfig(zk,
"scores");

        kafkaConf.scheme = new SchemeAsMultiScheme(new StringScheme());
        kafkaConf.fetchSizeBytes = 10000;
        kafkaConf.forceFromStart = true;

        Config stormConfig = new Config();
        stormConfig.put(Config.NIMBUS_HOST, NIMBUS_IP);
        stormConfig.put("couchbase.ip", COUCHBASE_CONSOLE);
        stormConfig.put("couchbase.bucket", COUCHBASE_BUCKET);
        stormConfig.put("couchbase.password", COUCHBASE_PASSWORD);
        // performance settings
        stormConfig.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS,
100);
        stormConfig.put(RichSpoutBatchExecutor.MAX_BATCH_SIZE_CONF, 100);
        stormConfig.setMaxSpoutPending(100000);


        if (args != null && args.length > 0) {

            StormSubmitter.submitTopologyWithProgressBar(args[0],
stormConfig,
                    BuildTridentScoreTopology.build(kafkaConf));
        } else {...}

Now, I've created 'scores' topic in kafka and pushed few test messages
prior to starting topology, with kafkaConf.forceFromStart = true. And
topology processed those messages just fine, and stored them in
tridentState (couhbase)

All new messages are simply ignored!

After redeploying topology (both with forceFromStart = true and
forceFromStart = false) no more messages are ingested from kafka.

here is worker log for one topology deployment and short run
http://pastie.org/private/4xsk6pijvmulwrcg7zgca

those are VMs that host this storm cluster
10.0.0.241 zookeeper1
10.0.0.101 supervisor1
10.0.0.21 kafka1
10.0.0.251 nimbus1

Thanks,
Milos

Reply via email to