Looks like you’re using OpaqueTridentKafkaSpout?

topology.max.spout.pending, for a Trident spout, refers to the number of 
in-flight batches. When you set this number too high, OpaqueTridentKafkaSpout 
sort of freaks out and emits the same tuples in several batches.

The batches aren’t failing; the spout is actually duplicating the batches. If 
you set a breakpoint on a component directly downstream from your spout and 
debug, you’ll see the same tuples emitted, but with monotonically increasing 
batch IDs. The TransactionalTridentKafkaSpout doesn’t seem to have this problem.

Which is a long way of saying that you need to decrease your 
topology.max.spout.pending when using OpaqueTridentKafkaSpout. Start really 
small (1 or 2) and then experiment by increasing it from there, and measuring 
the throughput.

I encountered the same problem, and this was the best explanation I could find: 
https://groups.google.com/forum/#!topic/storm-user/c9sjrGhM_7o 
<https://groups.google.com/forum/#!topic/storm-user/c9sjrGhM_7o>.

This should probably be highlighted in the storm-kaka README or something.

Best

Jake

> On Mar 13, 2015, at 12:22 AM, Qian, Shilei <[email protected]> wrote:
> 
> After I remove the storm configuration “topology.max.spout.pending”, the 
> trident workload runs well.
>  
> But I still get a little confused if I should set this parameter to improve 
> parallelism when processing trident topology.
>  
> From: Qian, Shilei [mailto:[email protected]] 
> Sent: Tuesday, March 10, 2015 3:36 PM
> To: [email protected]
> Subject: Trident read from Kafka borkers, processes multiple times
>  
> Hi,
>  
> I’m running Storm Trident workload, fetching message from Kafka brokers. 
> Storm version is 0.9.3.
>  
> I send just 64 records to Kafka, however, the trident will process these 
> records multiple times.
>  
> Some code are given in the end, thanks for your reading and sincerely wait 
> for your help.
>  
>  
>    BrokerHosts brokerHosts = new ZkHosts(zkHost);
>    TridentKafkaConfig tridentKafkaConfig = new 
> TridentKafkaConfig(brokerHosts,topic,consumerGroup);
>    tridentKafkaConfig.fetchSizeBytes  = 10*1024;
>     tridentKafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
>     OpaqueTridentKafkaSpout spout = new 
> OpaqueTridentKafkaSpout(tridentKafkaConfig);
>     
>     topology
>       .newStream("bg0", spout)
>       .each(spout.getOutputFields(), new Identity(), new Fields("tuple"));
>  
>     public static class Identity extends BaseFunction {
>       @Override
>       public void execute(TridentTuple tuple, TridentCollector collector){
>         collector.emit(new Values(tuple.getValues()));
>       }
> }
>  
>  
> Regards
> Qian, Shilei

Reply via email to