Hi,

Currently I am also working on KafkaSpout, but my bolt is not emitting any
message. Kafka Topic has various messages.When I consume messages from
kafka consumer on terminal, it shows all the messages.
But while executing Topology, getting following logs :

68503 [Thread-18-spout] INFO  storm.kafka.ZkCoordinator - Refreshing
partition manager connections
68503 [Thread-20-spout] INFO  storm.kafka.ZkCoordinator - Refreshing
partition manager connections
68504 [Thread-18-spout] INFO  storm.kafka.ZkCoordinator - Deleted partition
managers: []
68504 [Thread-18-spout] INFO  storm.kafka.ZkCoordinator - New partition
managers: []
68504 [Thread-18-spout] INFO  storm.kafka.ZkCoordinator - Finished
refreshing
68505 [Thread-20-spout] INFO  storm.kafka.ZkCoordinator - Deleted partition
managers: []
68505 [Thread-20-spout] INFO  storm.kafka.ZkCoordinator - New partition
managers: []
68505 [Thread-20-spout] INFO  storm.kafka.ZkCoordinator - Finished
refreshing

Can you please share which storm-kafka dependencies and versions are using
in your pom.xml?

Any help would be really appreciated.

Thanks,
Nishu


On Wed, Apr 30, 2014 at 1:15 PM, Qian Lin <[email protected]>wrote:

> The BaseBasicBolt instance is executed in BasicBoltExecutor, whose
> execute() method is shown below:
>
>     public void execute(Tuple input) {
>         _collector.setContext(input);
>         try {
>             _bolt.execute(input, _collector);
>             _collector.getOutputter().ack(input);
>         } catch(FailedException e) {
>             if(e instanceof ReportedFailedException) {
>                 _collector.reportError(e);
>             }
>             _collector.getOutputter().fail(input);
>         }
>     }
>
> As can be seen, the BaseBasicBolt is only useful for the pattern of
> reading an input tuple, emitting tuples based on it, and then acking the
> tuple at the end of the execute() method. In other words, the ack() is
> actualized in the BasicBoltExecutor rather than the BaseBasicBolt. Even
> if the BaseBasicBolt instance is a sink bolt (i.e., emitting nothing), it
> will still ack the input after its execute() finishes each time.
>
> Qian
>
>
> On Tue, Apr 29, 2014 at 11:46 PM, Kashyap Mhaisekar 
> <[email protected]>wrote:
>
>> Hi,
>> I have a strange problem my topology. I use KafkaSpout to read from a
>> kafka topic and i find that the topology stops consuming messages after a
>> while, without apparent reason.
>> I suspect this on acking.
>>
>> I use BaseBasicBolt (due to auto acking capabilities) and what I do in
>> bolt is use a condition to emit tuples out.
>> My question is -
>> 1. When I emit from an execute(...), does that mean acking happens
>> automatically here?
>> 2. What if I dont emit all tuples? If I use a condition like the code
>> highlighted below, does it mean that acking does not happen when execute
>> method is not complete?
>> 3. How do I ack from BasicOutputCollector? I mean, OutputCollector has
>> method *ack *used for acking while BasicOutputCollector has no such
>> method. What do I do to explicitly ack using BasicOutputCollector?
>> 4. If I have a bolt that saves values to DB and does not emit anything,
>> will it cause an problem?
>>
>> E.g.,
>>
>> public void execute(Tuple tuple, BasicOutputCollector collector) {
>>             String sentence = tuple.getString(0);
>>             for(String word: sentence.split(" ")) {
>>                 *if (word.equals("the"))* collector.emit(new Values(word));
>>             }
>>         }
>>
>> Please help!
>>
>> Regards,
>> Kashyap
>>
>
>


-- 
with regards,
Nishu Tayal

Reply via email to