Nishu, I use <dependency> <groupId>net.wurstmeister.storm</groupId> <artifactId>storm-kafka-0.8-plus</artifactId> <version>0.4.0</version> </dependency> <dependency> <groupId>storm</groupId> <artifactId>storm</artifactId> <version>0.9.0-rc3</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.8.0</artifactId> <version>0.8.1</version> </dependency>
for kafkaspout Regards, kashyap On Thu, May 1, 2014 at 11:40 AM, Nishu <[email protected]> wrote: > Hi, > > Currently I am also working on KafkaSpout, but my bolt is not emitting any > message. Kafka Topic has various messages.When I consume messages from > kafka consumer on terminal, it shows all the messages. > But while executing Topology, getting following logs : > > 68503 [Thread-18-spout] INFO storm.kafka.ZkCoordinator - Refreshing > partition manager connections > 68503 [Thread-20-spout] INFO storm.kafka.ZkCoordinator - Refreshing > partition manager connections > 68504 [Thread-18-spout] INFO storm.kafka.ZkCoordinator - Deleted > partition managers: [] > 68504 [Thread-18-spout] INFO storm.kafka.ZkCoordinator - New partition > managers: [] > 68504 [Thread-18-spout] INFO storm.kafka.ZkCoordinator - Finished > refreshing > 68505 [Thread-20-spout] INFO storm.kafka.ZkCoordinator - Deleted > partition managers: [] > 68505 [Thread-20-spout] INFO storm.kafka.ZkCoordinator - New partition > managers: [] > 68505 [Thread-20-spout] INFO storm.kafka.ZkCoordinator - Finished > refreshing > > Can you please share which storm-kafka dependencies and versions are using > in your pom.xml? > > Any help would be really appreciated. > > Thanks, > Nishu > > > On Wed, Apr 30, 2014 at 1:15 PM, Qian Lin <[email protected]>wrote: > >> The BaseBasicBolt instance is executed in BasicBoltExecutor, whose >> execute() method is shown below: >> >> public void execute(Tuple input) { >> _collector.setContext(input); >> try { >> _bolt.execute(input, _collector); >> _collector.getOutputter().ack(input); >> } catch(FailedException e) { >> if(e instanceof ReportedFailedException) { >> _collector.reportError(e); >> } >> _collector.getOutputter().fail(input); >> } >> } >> >> As can be seen, the BaseBasicBolt is only useful for the pattern of >> reading an input tuple, emitting tuples based on it, and then acking the >> tuple at the end of the execute() method. In other words, the ack() is >> actualized in the BasicBoltExecutor rather than the BaseBasicBolt. Even >> if the BaseBasicBolt instance is a sink bolt (i.e., emitting nothing), >> it will still ack the input after its execute() finishes each time. >> >> Qian >> >> >> On Tue, Apr 29, 2014 at 11:46 PM, Kashyap Mhaisekar >> <[email protected]>wrote: >> >>> Hi, >>> I have a strange problem my topology. I use KafkaSpout to read from a >>> kafka topic and i find that the topology stops consuming messages after a >>> while, without apparent reason. >>> I suspect this on acking. >>> >>> I use BaseBasicBolt (due to auto acking capabilities) and what I do in >>> bolt is use a condition to emit tuples out. >>> My question is - >>> 1. When I emit from an execute(...), does that mean acking happens >>> automatically here? >>> 2. What if I dont emit all tuples? If I use a condition like the code >>> highlighted below, does it mean that acking does not happen when execute >>> method is not complete? >>> 3. How do I ack from BasicOutputCollector? I mean, OutputCollector has >>> method *ack *used for acking while BasicOutputCollector has no such >>> method. What do I do to explicitly ack using BasicOutputCollector? >>> 4. If I have a bolt that saves values to DB and does not emit anything, >>> will it cause an problem? >>> >>> E.g., >>> >>> public void execute(Tuple tuple, BasicOutputCollector collector) { >>> String sentence = tuple.getString(0); >>> for(String word: sentence.split(" ")) { >>> *if (word.equals("the"))* collector.emit(new Values(word)); >>> } >>> } >>> >>> Please help! >>> >>> Regards, >>> Kashyap >>> >> >> > > > -- > with regards, > Nishu Tayal >
