Nishu,
I use
<dependency>
<groupId>net.wurstmeister.storm</groupId>
<artifactId>storm-kafka-0.8-plus</artifactId>
<version>0.4.0</version>
</dependency>
<dependency>
<groupId>storm</groupId>
<artifactId>storm</artifactId>
<version>0.9.0-rc3</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.8.0</artifactId>
<version>0.8.1</version>
</dependency>

for kafkaspout

Regards,
kashyap


On Thu, May 1, 2014 at 11:40 AM, Nishu <[email protected]> wrote:

> Hi,
>
> Currently I am also working on KafkaSpout, but my bolt is not emitting any
> message. Kafka Topic has various messages.When I consume messages from
> kafka consumer on terminal, it shows all the messages.
> But while executing Topology, getting following logs :
>
> 68503 [Thread-18-spout] INFO  storm.kafka.ZkCoordinator - Refreshing
> partition manager connections
> 68503 [Thread-20-spout] INFO  storm.kafka.ZkCoordinator - Refreshing
> partition manager connections
> 68504 [Thread-18-spout] INFO  storm.kafka.ZkCoordinator - Deleted
> partition managers: []
> 68504 [Thread-18-spout] INFO  storm.kafka.ZkCoordinator - New partition
> managers: []
> 68504 [Thread-18-spout] INFO  storm.kafka.ZkCoordinator - Finished
> refreshing
> 68505 [Thread-20-spout] INFO  storm.kafka.ZkCoordinator - Deleted
> partition managers: []
> 68505 [Thread-20-spout] INFO  storm.kafka.ZkCoordinator - New partition
> managers: []
> 68505 [Thread-20-spout] INFO  storm.kafka.ZkCoordinator - Finished
> refreshing
>
> Can you please share which storm-kafka dependencies and versions are using
> in your pom.xml?
>
> Any help would be really appreciated.
>
> Thanks,
> Nishu
>
>
> On Wed, Apr 30, 2014 at 1:15 PM, Qian Lin <[email protected]>wrote:
>
>> The BaseBasicBolt instance is executed in BasicBoltExecutor, whose
>> execute() method is shown below:
>>
>>     public void execute(Tuple input) {
>>         _collector.setContext(input);
>>         try {
>>             _bolt.execute(input, _collector);
>>             _collector.getOutputter().ack(input);
>>         } catch(FailedException e) {
>>             if(e instanceof ReportedFailedException) {
>>                 _collector.reportError(e);
>>             }
>>             _collector.getOutputter().fail(input);
>>         }
>>     }
>>
>> As can be seen, the BaseBasicBolt is only useful for the pattern of
>> reading an input tuple, emitting tuples based on it, and then acking the
>> tuple at the end of the execute() method. In other words, the ack() is
>> actualized in the BasicBoltExecutor rather than the BaseBasicBolt. Even
>> if the BaseBasicBolt instance is a sink bolt (i.e., emitting nothing),
>> it will still ack the input after its execute() finishes each time.
>>
>> Qian
>>
>>
>> On Tue, Apr 29, 2014 at 11:46 PM, Kashyap Mhaisekar 
>> <[email protected]>wrote:
>>
>>> Hi,
>>> I have a strange problem my topology. I use KafkaSpout to read from a
>>> kafka topic and i find that the topology stops consuming messages after a
>>> while, without apparent reason.
>>> I suspect this on acking.
>>>
>>> I use BaseBasicBolt (due to auto acking capabilities) and what I do in
>>> bolt is use a condition to emit tuples out.
>>> My question is -
>>> 1. When I emit from an execute(...), does that mean acking happens
>>> automatically here?
>>> 2. What if I dont emit all tuples? If I use a condition like the code
>>> highlighted below, does it mean that acking does not happen when execute
>>> method is not complete?
>>> 3. How do I ack from BasicOutputCollector? I mean, OutputCollector has
>>> method *ack *used for acking while BasicOutputCollector has no such
>>> method. What do I do to explicitly ack using BasicOutputCollector?
>>> 4. If I have a bolt that saves values to DB and does not emit anything,
>>> will it cause an problem?
>>>
>>> E.g.,
>>>
>>> public void execute(Tuple tuple, BasicOutputCollector collector) {
>>>             String sentence = tuple.getString(0);
>>>             for(String word: sentence.split(" ")) {
>>>                 *if (word.equals("the"))* collector.emit(new Values(word));
>>>             }
>>>         }
>>>
>>> Please help!
>>>
>>> Regards,
>>> Kashyap
>>>
>>
>>
>
>
> --
> with regards,
> Nishu Tayal
>

Reply via email to