Hi Kashyap, Currently I am also working on KafkaSpout, but my bolt is not emitting any message. Kafka Topic has various messages. While executing Topology, getting following logs :
68503 [Thread-18-spout] INFO storm.kafka.ZkCoordinator - Refreshing partition manager connections 68503 [Thread-20-spout] INFO storm.kafka.ZkCoordinator - Refreshing partition manager connections 68504 [Thread-18-spout] INFO storm.kafka.ZkCoordinator - Deleted partition managers: [] 68504 [Thread-18-spout] INFO storm.kafka.ZkCoordinator - New partition managers: [] 68504 [Thread-18-spout] INFO storm.kafka.ZkCoordinator - Finished refreshing 68505 [Thread-20-spout] INFO storm.kafka.ZkCoordinator - Deleted partition managers: [] 68505 [Thread-20-spout] INFO storm.kafka.ZkCoordinator - New partition managers: [] 68505 [Thread-20-spout] INFO storm.kafka.ZkCoordinator - Finished refreshing When I consume messages from kafka consumer on terminal, it shows all the messages. Can you please share which storm-kafka dependencies and versions are using in your pom.xml? Any help would be really appreciated. Thanks, Nishu On Tue, Apr 29, 2014 at 9:16 PM, Kashyap Mhaisekar <[email protected]>wrote: > Hi, > I have a strange problem my topology. I use KafkaSpout to read from a > kafka topic and i find that the topology stops consuming messages after a > while, without apparent reason. > I suspect this on acking. > > I use BaseBasicBolt (due to auto acking capabilities) and what I do in > bolt is use a condition to emit tuples out. > My question is - > 1. When I emit from an execute(...), does that mean acking happens > automatically here? > 2. What if I dont emit all tuples? If I use a condition like the code > highlighted below, does it mean that acking does not happen when execute > method is not complete? > 3. How do I ack from BasicOutputCollector? I mean, OutputCollector has > method *ack *used for acking while BasicOutputCollector has no such > method. What do I do to explicitly ack using BasicOutputCollector? > 4. If I have a bolt that saves values to DB and does not emit anything, > will it cause an problem? > > E.g., > > public void execute(Tuple tuple, BasicOutputCollector collector) { > String sentence = tuple.getString(0); > for(String word: sentence.split(" ")) { > *if (word.equals("the"))* collector.emit(new Values(word)); > } > } > > Please help! > > Regards, > Kashyap >
