Hello, Is there a reason why I shouldn’t merge a transactional and a non-transactional trident stream?
Thanks, Jacques Begin forwarded message: > From: Jacques <[email protected]> > Subject: Merging trident streams blocks the FixedBatchSpout > Date: July 28, 2014 at 8:22:50 PM PDT > To: [email protected] > > Hi guys, > I need some help understanding why merging two streams blocks one of the > spouts of class FixedBatchSpout. > I’ve been stuck 2 days on this, help would be much appreciated (see details > below). > > Thanks, > Jacques > > > Short Description: I’m trying to merge two streams s1 and s2, but calling > topology.merge(s1, s2) blocks the FixedBatchSpout from which s1 originates, > whereas the BaseRichSpout from s2 seems to work properly. > > Details: In the below main method, just adding the line topology.merge(s1, > s2); prevents the FixedBatchSpout to emit past its first batch. This happens > with multireduce as well. > > FixedBatchSpout spout1 = new FixedBatchSpout(new Fields("sentence"), 2, > new Values("the cow jumped over the moon"), > new Values("the man went to the store and > bought some candy”)); > > FixedLoopSpout spout2 = new FixedLoopSpout(new Fields("sentence"), > new Values("THE COW JUMPED OVER THE MOON"), > new Values("THE MAN WENT TO THE STORE AND > BOUGHT SOME CANDY")); > > Stream s1 = topology.newStream("hello", spout1); > Stream s2 = topology.newStream("world", spout2); > topology.merge(s1, s2); > > > public class FixedLoopSpout extends BaseRichSpout { > > Values[] values; > List<Values> loop = new LinkedList<Values>(); > Iterator<Values> head; > private SpoutOutputCollector collector; > private final Fields outputFields; > > private long emitted = 0; > > public FixedLoopSpout(Fields outputFields, Values... values) { > this.outputFields = outputFields; > this.values = values; > } > > public void open(Map conf, TopologyContext context, > SpoutOutputCollector collector) { > this.collector = collector; > for (Values value: this.values) { > this.loop.add(value); > } > this.head = this.loop.iterator(); > } > > public void nextTuple() { > if (!this.head.hasNext()) { > // wrap > this.head = this.loop.iterator(); > } > this.collector.emit(this.head.next(), this.emitted++); > try { > Thread.sleep(100); > } catch (InterruptedException e) { > e.printStackTrace(); > } > } > > public void declareOutputFields(OutputFieldsDeclarer declarer) { > declarer.declare(this.outputFields); > } > }
