Your data is replicated because of all grouping. I assume you have more than kafka and hdfs bolt. If you send via fields grouping (or shuffle grouping, or local or shuffle grouping), and both are subscribed to your deserializer, then one task in hdfs bolt and one task in kafka bolt will get the tuple. The distribution is based on which grouping you use. Since you subscribe using all grouping, then all of the tasks in hdfs bolt and all of the tasks in kafka bolt will get your tuple.
On Mon, Jul 7, 2014 at 2:56 PM, Max Evers <[email protected]> wrote: > > From: Evers, Maxwell C > > Sent: Monday, July 07, 2014 1:51 PM > > To: '[email protected]' > > Subject: storm-kafka integration duplication of messages > > > > > > > > Working with a kafkaSpout feeding a storm topology, and dumping back out > of the topology to kafka and hdfs (duplicating sending to AllGrouping), I > am seeing a great deal of duplicate messages coming through the flow. > > > > > > > > The topology is fairly trivial in scope currently, as it is just > KafkaSpout -> Bolt to deserialize a java object and emit a field as string > -> ( HdfsBolt from the pgoetz project AND KafkaBolt to outgoing queue). > > > > The latter portion, the hdfsbolt/output to kafka doesn’t seem to be the > source of the problem, as we’re seeing duplicate messages incoming in the > deserialization, so they just suffer the symptoms. > > > > The deserialization bolt is too simple that I don’t think it could be > the problem… it gets a tuple, gets the byte array, deserializes, emits, > acks. > > > > > > > > So that leaves the KafkaSpout as missing some vital configuration to > prevent the massive duplication we are seeing of incoming messages. The > configuration is almost identical to the few examples I’ve seen around, in > the Michael Knoll tutorial, as well in the Nathan Marz git. > > > > > > > > //****************************************************************** > > > > ZkHosts hosts = new ZkHosts(brokerZkStr); > > > > > > > > SpoutConfig spoutConfig = new SpoutConfig(hosts, topic, zkRoot, > > > > spoutConfigID); > > > > spoutConfig.scheme = new RawMultiScheme(); > > > > > > > > KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig); > > > > topologyBuilder.setSpout(spoutID, kafkaSpout, spout_parallelism_hint); > > > > //***************************************************************** > > > > > > > > > > > > Can anyone explain what factors would cause the duplication of messages > that I’m seeing? Should I be looking more into kafka than into the > storm-kafka spout? > > > > > > > > Regards, > > > > Maxwell E. > > > > > > > > ________________________________ > > This message, and any attachments, is for the intended recipient(s) > only, may contain information that is privileged, confidential and/or > proprietary and subject to important terms and conditions available at > http://www.bankofamerica.com/emaildisclaimer. If you are not the intended > recipient, please delete this message. >
