This is interesting.. I am not sure why that would happen
On Friday, March 13, 2015 9:58 AM, Jake Dodd <[email protected]> wrote:
Looks like you’re using OpaqueTridentKafkaSpout?
topology.max.spout.pending, for a Trident spout, refers to the number of
in-flight batches. When you set this number too high, OpaqueTridentKafkaSpout
sort of freaks out and emits the same tuples in several batches.
The batches aren’t failing; the spout is actually duplicating the batches. If
you set a breakpoint on a component directly downstream from your spout and
debug, you’ll see the same tuples emitted, but with monotonically increasing
batch IDs. The TransactionalTridentKafkaSpout doesn’t seem to have this problem.
Which is a long way of saying that you need to decrease your
topology.max.spout.pending when using OpaqueTridentKafkaSpout. Start really
small (1 or 2) and then experiment by increasing it from there, and measuring
the throughput.
I encountered the same problem, and this was the best explanation I could find:
https://groups.google.com/forum/#!topic/storm-user/c9sjrGhM_7o.
This should probably be highlighted in the storm-kaka README or something.
Best
Jake
On Mar 13, 2015, at 12:22 AM, Qian, Shilei <[email protected]> wrote:
After I remove the storm configuration “topology.max.spout.pending”, the
trident workload runs well. But I still get a little confused if I should set
this parameter to improve parallelism when processing trident topology. From:
Qian, Shilei [mailto:[email protected]]
Sent: Tuesday, March 10, 2015 3:36 PM
To: [email protected]
Subject: Trident read from Kafka borkers, processes multiple times Hi, I’m
running Storm Trident workload, fetching message from Kafka brokers. Storm
version is 0.9.3. I send just 64 records to Kafka, however, the trident will
process these records multiple times. Some code are given in the end, thanks
for your reading and sincerely wait for your help. BrokerHosts
brokerHosts = new ZkHosts(zkHost); TridentKafkaConfig tridentKafkaConfig =
new TridentKafkaConfig(brokerHosts,topic,consumerGroup);
tridentKafkaConfig.fetchSizeBytes = 10*1024; tridentKafkaConfig.scheme =
new SchemeAsMultiScheme(new StringScheme()); OpaqueTridentKafkaSpout spout =
new OpaqueTridentKafkaSpout(tridentKafkaConfig); topology
.newStream("bg0", spout) .each(spout.getOutputFields(), new Identity(),
new Fields("tuple")); public static class Identity extends BaseFunction {
@Override public void execute(TridentTuple tuple, TridentCollector
collector){ collector.emit(new Values(tuple.getValues())); }}
RegardsQian, Shilei