This is interesting.. I am not sure why that would happen   

     On Friday, March 13, 2015 9:58 AM, Jake Dodd <[email protected]> wrote:
   

 Looks like you’re using OpaqueTridentKafkaSpout?
topology.max.spout.pending, for a Trident spout, refers to the number of 
in-flight batches. When you set this number too high, OpaqueTridentKafkaSpout 
sort of freaks out and emits the same tuples in several batches.
The batches aren’t failing; the spout is actually duplicating the batches. If 
you set a breakpoint on a component directly downstream from your spout and 
debug, you’ll see the same tuples emitted, but with monotonically increasing 
batch IDs. The TransactionalTridentKafkaSpout doesn’t seem to have this problem.
Which is a long way of saying that you need to decrease your 
topology.max.spout.pending when using OpaqueTridentKafkaSpout. Start really 
small (1 or 2) and then experiment by increasing it from there, and measuring 
the throughput.
I encountered the same problem, and this was the best explanation I could find: 
https://groups.google.com/forum/#!topic/storm-user/c9sjrGhM_7o.
This should probably be highlighted in the storm-kaka README or something.
Best
Jake

On Mar 13, 2015, at 12:22 AM, Qian, Shilei <[email protected]> wrote:
After I remove the storm configuration “topology.max.spout.pending”, the 
trident workload runs well. But I still get a little confused if I should set 
this parameter to improve parallelism when processing trident topology. From: 
Qian, Shilei [mailto:[email protected]
Sent: Tuesday, March 10, 2015 3:36 PM
To: [email protected]
Subject: Trident read from Kafka borkers, processes multiple times  Hi,  I’m 
running Storm Trident workload, fetching message from Kafka brokers. Storm 
version is 0.9.3.  I send just 64 records to Kafka, however, the trident will 
process these records multiple times.  Some code are given in the end, thanks 
for your reading and sincerely wait for your help.       BrokerHosts 
brokerHosts = new ZkHosts(zkHost);   TridentKafkaConfig tridentKafkaConfig = 
new TridentKafkaConfig(brokerHosts,topic,consumerGroup);   
tridentKafkaConfig.fetchSizeBytes  = 10*1024;    tridentKafkaConfig.scheme = 
new SchemeAsMultiScheme(new StringScheme());    OpaqueTridentKafkaSpout spout = 
new OpaqueTridentKafkaSpout(tridentKafkaConfig);        topology      
.newStream("bg0", spout)      .each(spout.getOutputFields(), new Identity(), 
new Fields("tuple"));      public static class Identity extends BaseFunction {  
    @Override      public void execute(TridentTuple tuple, TridentCollector 
collector){        collector.emit(new Values(tuple.getValues()));      }}    
RegardsQian, Shilei



  

Reply via email to