Hello,
Is there a reason why I shouldn’t merge a transactional and a non-transactional 
trident stream?

Thanks,
Jacques

Begin forwarded message:

> From: Jacques <[email protected]>
> Subject: Merging trident streams blocks the FixedBatchSpout
> Date: July 28, 2014 at 8:22:50 PM PDT
> To: [email protected]
> 
> Hi guys,
> I need some help understanding why merging two streams blocks one of the 
> spouts of class FixedBatchSpout.
> I’ve been stuck 2 days on this, help would be much appreciated (see details 
> below).
> 
> Thanks,
> Jacques
> 
> 
> Short Description: I’m trying to merge two streams s1 and s2, but calling 
> topology.merge(s1, s2) blocks the FixedBatchSpout from which s1 originates, 
> whereas the BaseRichSpout from s2 seems to work properly.
> 
> Details: In the below main method, just adding the line topology.merge(s1, 
> s2); prevents the FixedBatchSpout to emit past its first batch. This happens 
> with multireduce as well.
> 
> FixedBatchSpout spout1 = new FixedBatchSpout(new Fields("sentence"), 2,
>                               new Values("the cow jumped over the moon"),
>                               new Values("the man went to the store and 
> bought some candy”));
> 
> FixedLoopSpout spout2 = new FixedLoopSpout(new Fields("sentence"),
>                               new Values("THE COW JUMPED OVER THE MOON"),
>                               new Values("THE MAN WENT TO THE STORE AND 
> BOUGHT SOME CANDY"));
> 
> Stream s1 = topology.newStream("hello", spout1);
> Stream s2 = topology.newStream("world", spout2);
> topology.merge(s1, s2);
> 
> 
> public class FixedLoopSpout extends BaseRichSpout {
> 
>       Values[] values;
>       List<Values> loop = new LinkedList<Values>();
>       Iterator<Values> head;
>       private SpoutOutputCollector collector;
>       private final Fields outputFields;
> 
>       private long emitted = 0;
> 
>       public FixedLoopSpout(Fields outputFields, Values... values) {
>               this.outputFields = outputFields;
>               this.values = values;
>       }
> 
>       public void open(Map conf, TopologyContext context, 
> SpoutOutputCollector collector) {
>               this.collector = collector;
>               for (Values value: this.values) {
>                       this.loop.add(value);
>               }
>               this.head = this.loop.iterator();
>       }
> 
>       public void nextTuple() {
>               if (!this.head.hasNext()) {
>                       // wrap
>                       this.head = this.loop.iterator();
>               }
>               this.collector.emit(this.head.next(), this.emitted++);
>               try {
>                       Thread.sleep(100);
>               } catch (InterruptedException e) {
>                       e.printStackTrace();
>               }
>       }
> 
>       public void declareOutputFields(OutputFieldsDeclarer declarer) {
>               declarer.declare(this.outputFields);
>       }
> }

Reply via email to