Hi guys,
I need some help understanding why merging two streams blocks one of the spouts 
of class FixedBatchSpout.
I’ve been stuck 2 days on this, help would be much appreciated (see details 
below).

Thanks,
Jacques


Short Description: I’m trying to merge two streams s1 and s2, but calling 
topology.merge(s1, s2) blocks the FixedBatchSpout from which s1 originates, 
whereas the BaseRichSpout from s2 seems to work properly.

Details: In the below main method, just adding the line topology.merge(s1, s2); 
prevents the FixedBatchSpout to emit past its first batch. This happens with 
multireduce as well.

FixedBatchSpout spout1 = new FixedBatchSpout(new Fields("sentence"), 2,
                                new Values("the cow jumped over the moon"),
                                new Values("the man went to the store and 
bought some candy”));

FixedLoopSpout spout2 = new FixedLoopSpout(new Fields("sentence"),
                                new Values("THE COW JUMPED OVER THE MOON"),
                                new Values("THE MAN WENT TO THE STORE AND 
BOUGHT SOME CANDY"));

Stream s1 = topology.newStream("hello", spout1);
Stream s2 = topology.newStream("world", spout2);
topology.merge(s1, s2);


public class FixedLoopSpout extends BaseRichSpout {

        Values[] values;
        List<Values> loop = new LinkedList<Values>();
        Iterator<Values> head;
        private SpoutOutputCollector collector;
        private final Fields outputFields;

        private long emitted = 0;

        public FixedLoopSpout(Fields outputFields, Values... values) {
                this.outputFields = outputFields;
                this.values = values;
        }

        public void open(Map conf, TopologyContext context, 
SpoutOutputCollector collector) {
                this.collector = collector;
                for (Values value: this.values) {
                        this.loop.add(value);
                }
                this.head = this.loop.iterator();
        }

        public void nextTuple() {
                if (!this.head.hasNext()) {
                        // wrap
                        this.head = this.loop.iterator();
                }
                this.collector.emit(this.head.next(), this.emitted++);
                try {
                        Thread.sleep(100);
                } catch (InterruptedException e) {
                        e.printStackTrace();
                }
        }

        public void declareOutputFields(OutputFieldsDeclarer declarer) {
                declarer.declare(this.outputFields);
        }
}

Reply via email to