I believe you should be using a Trident Kafka spout variant if you're building a Trident topology, not the plain Storm KafkaSpout one.
On Tuesday, May 27, 2014, Romain Leroux <[email protected]> wrote: > Hi, > > First of all thanks to @miguno for his amazing work on kafka-storm-starter. > > I am trying to add a memcached state to it based on : > https://github.com/nathanmarz/trident-memcached > > More particularly I'd like to test the full stack: > Kafka->Storm->TransactionalState(Memcached) with Trident. > I am firstly focusing on a normal KafkaSpout (as the default settings in > kafka-storm-starter) and an nonTransactional state. > > However I faced the unexpected following Exception during my first tests : > > ./sbt run > ... > 9117 [Thread-10] INFO backtype.storm.daemon.worker - Worker > 155f0d72-2e99-495b-9cf5-2523076b3d73 for storm > kafka-storm-starter_memcached-1-1401177768 on > 3928bc48-49e2-4e88-bb51-eae5642ee963:1025 has finished loading > 9118 [Thread-68-__system] INFO backtype.storm.daemon.executor - Prepared > bolt __system:(-1) > 9151 [Thread-58-spout0] INFO > com.netflix.curator.framework.imps.CuratorFrameworkImpl - Starting > 9171 [Thread-58-spout0] INFO > com.netflix.curator.framework.imps.CuratorFrameworkImpl - Starting > 9199 [Thread-58-spout0] INFO storm.kafka.DynamicBrokersReader - Read > partition info from zookeeper: GlobalPartitionInformation{partitionMap={0= > 127.0.0.1:9092}} > 9200 [Thread-58-spout0] INFO > com.netflix.curator.framework.imps.CuratorFrameworkImpl - Starting > 9228 [Thread-58-spout0] ERROR backtype.storm.util - Async loop died! > java.lang.RuntimeException: java.lang.RuntimeException: > TopologyContext.registerMetric can only be called from within overridden > IBolt::prepare() or ISpout::open() method. > at > backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:107) > ~[storm-core-0.9.1-incubating.jar:0.9.1-incubating] > at > backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:78) > ~[storm-core-0.9.1-incubating.jar:0.9.1-incubating] > at > backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:77) > ~[storm-core-0.9.1-incubating.jar:0.9.1-incubating] > at > backtype.storm.daemon.executor$eval5170$fn__5171$fn__5183$fn__5230.invoke(executor.clj:745) > ~[na:na] > at backtype.storm.util$async_loop$fn__390.invoke(util.clj:433) > ~[na:na] > at clojure.lang.AFn.run(AFn.java:24) ~[clojure-1.4.0.jar:na] > at java.lang.Thread.run(Thread.java:724) ~[na:1.7.0_25] > Caused by: java.lang.RuntimeException: TopologyContext.registerMetric can > only be called from within overridden IBolt::prepare() or ISpout::open() > method. > at > backtype.storm.task.TopologyContext.registerMetric(TopologyContext.java:230) > ~[storm-core-0.9.1-incubating.jar:0.9.1-incubating] > at storm.kafka.KafkaSpout.open(KafkaSpout.java:80) > ~[storm-kafka-0.8-plus_2.10-0.5.0-SNAPSHOT.jar:0.5.0-SNAPSHOT] > at > storm.trident.spout.RichSpoutBatchExecutor$RichSpoutEmitter.emitBatch(RichSpoutBatchExecutor.java:105) > ~[storm-core-0.9.1-incubating.jar:0.9.1-incubating] > at > storm.trident.spout.TridentSpoutExecutor.execute(TridentSpoutExecutor.java:82) > ~[storm-core-0.9.1-incubating.jar:0.9.1-incubating] > at > storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:369) > ~[storm-core-0.9.1-incubating.jar:0.9.1-incubating] > at > backtype.storm.daemon.executor$eval5170$fn__5171$tuple_action_fn__5173.invoke(executor.clj:630) > ~[na:na] > at > backtype.storm.daemon.executor$mk_task_receiver$fn__5091.invoke(executor.clj:398) > ~[na:na] > at > backtype.storm.disruptor$clojure_handler$reify__1894.onEvent(disruptor.clj:58) > ~[na:na] > at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor > > If someone has some pointers on that issue, they are more than welcome. > > Here is the code : > > https://github.com/lerouxrgd/kafka-storm-starter/tree/plugin_trient-memcached > > > > -- Danijel Schiavuzzi E: [email protected] W: www.schiavuzzi.com T: +385989035562 Skype: danijels7
