Hi, Currently I am also working on KafkaSpout, but my bolt is not emitting any message. Kafka Topic has various messages.When I consume messages from kafka consumer on terminal, it shows all the messages. But while executing Topology, getting following logs :
68503 [Thread-18-spout] INFO storm.kafka.ZkCoordinator - Refreshing partition manager connections 68503 [Thread-20-spout] INFO storm.kafka.ZkCoordinator - Refreshing partition manager connections 68504 [Thread-18-spout] INFO storm.kafka.ZkCoordinator - Deleted partition managers: [] 68504 [Thread-18-spout] INFO storm.kafka.ZkCoordinator - New partition managers: [] 68504 [Thread-18-spout] INFO storm.kafka.ZkCoordinator - Finished refreshing 68505 [Thread-20-spout] INFO storm.kafka.ZkCoordinator - Deleted partition managers: [] 68505 [Thread-20-spout] INFO storm.kafka.ZkCoordinator - New partition managers: [] 68505 [Thread-20-spout] INFO storm.kafka.ZkCoordinator - Finished refreshing Can you please share which storm-kafka dependencies and versions are using in your pom.xml? Any help would be really appreciated. Thanks, Nishu On Wed, Apr 30, 2014 at 1:15 PM, Qian Lin <[email protected]>wrote: > The BaseBasicBolt instance is executed in BasicBoltExecutor, whose > execute() method is shown below: > > public void execute(Tuple input) { > _collector.setContext(input); > try { > _bolt.execute(input, _collector); > _collector.getOutputter().ack(input); > } catch(FailedException e) { > if(e instanceof ReportedFailedException) { > _collector.reportError(e); > } > _collector.getOutputter().fail(input); > } > } > > As can be seen, the BaseBasicBolt is only useful for the pattern of > reading an input tuple, emitting tuples based on it, and then acking the > tuple at the end of the execute() method. In other words, the ack() is > actualized in the BasicBoltExecutor rather than the BaseBasicBolt. Even > if the BaseBasicBolt instance is a sink bolt (i.e., emitting nothing), it > will still ack the input after its execute() finishes each time. > > Qian > > > On Tue, Apr 29, 2014 at 11:46 PM, Kashyap Mhaisekar > <[email protected]>wrote: > >> Hi, >> I have a strange problem my topology. I use KafkaSpout to read from a >> kafka topic and i find that the topology stops consuming messages after a >> while, without apparent reason. >> I suspect this on acking. >> >> I use BaseBasicBolt (due to auto acking capabilities) and what I do in >> bolt is use a condition to emit tuples out. >> My question is - >> 1. When I emit from an execute(...), does that mean acking happens >> automatically here? >> 2. What if I dont emit all tuples? If I use a condition like the code >> highlighted below, does it mean that acking does not happen when execute >> method is not complete? >> 3. How do I ack from BasicOutputCollector? I mean, OutputCollector has >> method *ack *used for acking while BasicOutputCollector has no such >> method. What do I do to explicitly ack using BasicOutputCollector? >> 4. If I have a bolt that saves values to DB and does not emit anything, >> will it cause an problem? >> >> E.g., >> >> public void execute(Tuple tuple, BasicOutputCollector collector) { >> String sentence = tuple.getString(0); >> for(String word: sentence.split(" ")) { >> *if (word.equals("the"))* collector.emit(new Values(word)); >> } >> } >> >> Please help! >> >> Regards, >> Kashyap >> > > -- with regards, Nishu Tayal
