You can join like this in main function.

      Stream joinStreamInner =
      topology.join(streams, joinFields,
         new Fields("RequestId", "ColumnMapId", "FFValue", "SFValue",
"TFValue"),
         //JoinType.mixed(JoinType.INNER, JoinType.OUTER))
         JoinType.INNER)
         .each(new Fields("RequestId", "ColumnMapId", "FFValue",
"SFValue", "TFValue"), new PrintFilter("JoinedOutputInner"));

      Stream joinStreamOuter =
      topology.join(streams, joinFields,
         new Fields("RequestId", "ColumnMapId", "FFValue", "SFValue",
"TFValue"),
         //JoinType.mixed(JoinType.INNER, JoinType.OUTER))
         JoinType.OUTER)
         .each(new Fields("RequestId", "ColumnMapId", "FFValue",
"SFValue", "TFValue"), new PrintFilter("JoinedOutputOuter"));

      topology.join(joinStreamInner, new Fields("RequestId",
"ColumnMapId", "FFValue", "SFValue", "TFValue"),
         joinStreamOuter, new Fields("RequestId", "ColumnMapId",
"FFValue", "SFValue", "TFValue"),
         new Fields("RequestId", "ColumnMapId", "FFValue", "SFValue",
"TFValue"),
         //JoinType.mixed(JoinType.INNER, JoinType.OUTER))
         JoinType.INNER)
         .each(new Fields("RequestId", "ColumnMapId", "FFValue",
"SFValue", "TFValue"), new PrintFilter("JoinedOutput"));

But the outer join is not giving the expected result.
It is still doing the inner join only.

I am getting this output.

## PrintFilter [SpoutOutput]: [RequestId_1]
## PrintFilter [FirstFunctionOutput]: [RequestId_1, ColumnMapId_1, Value1]
## PrintFilter [FirstFunctionOutput]: [RequestId_1, ColumnMapId_2, Value1]
## PrintFilter [FirstFunctionOutput]: [RequestId_1, ColumnMapId_3, Value1]
## PrintFilter [FirstFunctionOutput]: [RequestId_1, ColumnMapId_4, Value1]
## PrintFilter [SecondFunctionOutput]: [RequestId_1, ColumnMapId_1, Value2]
## PrintFilter [SecondFunctionOutput]: [RequestId_1, ColumnMapId_2, Value2]
## PrintFilter [ThirdFunctionOutput]: [RequestId_1, ColumnMapId_1, Value3]
## PrintFilter [ThirdFunctionOutput]: [RequestId_1, ColumnMapId_3, Value3]
## PrintFilter [JoinedOutputOuter]: [RequestId_1, ColumnMapId_1,
Value1, Value2, Value3]
## PrintFilter [JoinedOutputInner]: [RequestId_1, ColumnMapId_1,
Value1, Value2, Value3]
## PrintFilter [JoinedOutput]: [RequestId_1, ColumnMapId_1, Value1,
Value2, Value3]
## PrintFilter [SpoutOutput]: [RequestId_2]
## PrintFilter [FirstFunctionOutput]: [RequestId_2, ColumnMapId_1, Value1]
## PrintFilter [FirstFunctionOutput]: [RequestId_2, ColumnMapId_2, Value1]
## PrintFilter [FirstFunctionOutput]: [RequestId_2, ColumnMapId_3, Value1]
## PrintFilter [FirstFunctionOutput]: [RequestId_2, ColumnMapId_4, Value1]
## PrintFilter [SecondFunctionOutput]: [RequestId_2, ColumnMapId_1, Value2]
## PrintFilter [SecondFunctionOutput]: [RequestId_2, ColumnMapId_2, Value2]
## PrintFilter [ThirdFunctionOutput]: [RequestId_2, ColumnMapId_1, Value3]
## PrintFilter [ThirdFunctionOutput]: [RequestId_2, ColumnMapId_3, Value3]
## PrintFilter [JoinedOutputOuter]: [RequestId_2, ColumnMapId_1,
Value1, Value2, Value3]
## PrintFilter [JoinedOutputInner]: [RequestId_2, ColumnMapId_1,
Value1, Value2, Value3]
## PrintFilter [JoinedOutput]: [RequestId_2, ColumnMapId_1, Value1,
Value2, Value3]
## PrintFilter [SpoutOutput]: [RequestId_3]
## PrintFilter [FirstFunctionOutput]: [RequestId_3, ColumnMapId_1, Value1]
## PrintFilter [FirstFunctionOutput]: [RequestId_3, ColumnMapId_2, Value1]
## PrintFilter [FirstFunctionOutput]: [RequestId_3, ColumnMapId_3, Value1]
## PrintFilter [FirstFunctionOutput]: [RequestId_3, ColumnMapId_4, Value1]
## PrintFilter [SecondFunctionOutput]: [RequestId_3, ColumnMapId_1, Value2]
## PrintFilter [SecondFunctionOutput]: [RequestId_3, ColumnMapId_2, Value2]
## PrintFilter [ThirdFunctionOutput]: [RequestId_3, ColumnMapId_1, Value3]
## PrintFilter [ThirdFunctionOutput]: [RequestId_3, ColumnMapId_3, Value3]
## PrintFilter [JoinedOutputOuter]: [RequestId_3, ColumnMapId_1,
Value1, Value2, Value3]
## PrintFilter [JoinedOutputInner]: [RequestId_3, ColumnMapId_1,
Value1, Value2, Value3]
## PrintFilter [JoinedOutput]: [RequestId_3, ColumnMapId_1, Value1,
Value2, Value3]

On 5/12/14, Kiran Kumar <[email protected]> wrote:
> Below is the test code i am trying to do left outer join on multiple
> streams..
>
> The issue i am getting is something like..
> RuntimeException: Expecting 4 lists instead getting 3 lists.
>
> FYI: This works fine for InnerJoin, but failing with the above exception
> when i am trying for Left Outer Join.
>
> ==========================
>
> package com.trident.fork.joins.test;
> /**
>  * @author dkirankumar
>  */
> import java.util.ArrayList;
> import java.util.List;
>
> import storm.trident.JoinType;
> import storm.trident.Stream;
> import storm.trident.TridentTopology;
> import backtype.storm.Config;
> import backtype.storm.LocalCluster;
> import backtype.storm.tuple.Fields;
>
> public class Topology {
>
> public static void main(String[] args) {
> TridentTopology topology = new TridentTopology();
>
> Stream stream = topology.newStream("spout", new Spout())
> .each(new Fields("RequestId"), new PrintFilter("SpoutOutput"));
>  // Forking the request to two different functions..
> Stream firstFunctionStream = stream
> .each(new Fields("RequestId"), new FirstFunction(), new
> Fields("ColumnMapId", "FFValue"))
> .each(new Fields("RequestId", "ColumnMapId", "FFValue"), new
> PrintFilter("FirstFunctionOutput"));
>  Stream secondFunctionStream = stream
> .each(new Fields("RequestId"), new SecondFunction(), new
> Fields("ColumnMapId", "SFValue"))
> .each(new Fields("RequestId", "ColumnMapId", "SFValue"), new
> PrintFilter("SecondFunctionOutput"));
>  Stream thirdFunctionStream = stream
> .each(new Fields("RequestId"), new ThirdFunction(), new
> Fields("ColumnMapId", "TFValue"))
> .each(new Fields("RequestId", "ColumnMapId", "TFValue"), new
> PrintFilter("ThirdFunctionOutput"));
>  // Joining two streams from two different functions..
> List<Stream> streams = new ArrayList<Stream>();
> streams.add(firstFunctionStream);
> streams.add(secondFunctionStream);
> streams.add(thirdFunctionStream);
>  List<Fields> joinFields = new ArrayList<Fields>();
> joinFields.add(new Fields("RequestId", "ColumnMapId"));
> joinFields.add(new Fields("RequestId", "ColumnMapId"));
> joinFields.add(new Fields("RequestId", "ColumnMapId"));
>  topology.join(streams, joinFields,
> new Fields("RequestId", "ColumnMapId", "FFValue", "SFValue", "TFValue"),
> JoinType.mixed(JoinType.INNER, JoinType.OUTER)
> )
> .each(new Fields("RequestId", "ColumnMapId", "FFValue", "SFValue",
> "TFValue"), new PrintFilter("JoinedOutput"));
>  LocalCluster cluster = new LocalCluster();
> cluster.submitTopology("MultipleJoinsTest", new Config(),
> topology.build());
> }
> }
>
>
> ==========================
>
> package com.trident.fork.joins.test;
> /**
>  * @author dkirankumar
>  */
> import java.util.Map;
>
> import redis.clients.jedis.JedisPool;
> import backtype.storm.spout.SpoutOutputCollector;
> import backtype.storm.task.TopologyContext;
> import backtype.storm.topology.OutputFieldsDeclarer;
> import backtype.storm.topology.base.BaseRichSpout;
> import backtype.storm.tuple.Fields;
> import backtype.storm.tuple.Values;
> import backtype.storm.utils.Utils;
>
> public class Spout extends BaseRichSpout {
>
> static final long serialVersionUID = 737015318988609460L;
>
> SpoutOutputCollector _collector;
> JedisPool pool;
>
> public void open(Map conf, TopologyContext context, SpoutOutputCollector
> collector) {
> _collector = collector;
> }
>
> public void close() {
> }
>
> int i = 1;
>
> public void nextTuple() {
> if(i > 1){
> Utils.sleep(3 * 1000);
> return;
> }
>  String requestId = "RequestId_" + i;
> _collector.emit(new Values(requestId));
> i++;
> }
>
> public void ack(Object msgId) {
>
> }
>
> public void fail(Object msgId) {
>
> }
>
> public void declareOutputFields(OutputFieldsDeclarer declarer) {
> declarer.declare(new Fields("RequestId"));
> }
>
> public boolean isDistributed() {
> return false;
> }
> }
>
> ========================
>
> package com.trident.fork.joins.test;
>
> /**
>  * @author dkirankumar
>  */
> import storm.trident.operation.BaseFunction;
> import storm.trident.operation.TridentCollector;
> import storm.trident.tuple.TridentTuple;
> import backtype.storm.tuple.Values;
>
> class FirstFunction extends BaseFunction {
>
> @Override
> public void execute(TridentTuple tuple, TridentCollector collector) {
> String requestId = tuple.getStringByField("RequestId");
> collector.emit(new Values("ColumnMapId_1", "Value1"));
> collector.emit(new Values("ColumnMapId_2", "Value1"));
> collector.emit(new Values("ColumnMapId_3", "Value1"));
> collector.emit(new Values("ColumnMapId_4", "Value1"));
> }
> }
>
> =========================
>
> package com.trident.fork.joins.test;
>
> /**
>  * @author dkirankumar
>  */
> import storm.trident.operation.BaseFunction;
> import storm.trident.operation.TridentCollector;
> import storm.trident.tuple.TridentTuple;
> import backtype.storm.tuple.Values;
>
> class SecondFunction extends BaseFunction {
>
> @Override
> public void execute(TridentTuple tuple, TridentCollector collector) {
> String requestId = tuple.getStringByField("RequestId");
> collector.emit(new Values("ColumnMapId_1", "Value2"));
> collector.emit(new Values("ColumnMapId_2", "Value2"));
> }
> }
>
> ===========================
>
> package com.trident.fork.joins.test;
>
> /**
>  * @author dkirankumar
>  */
> import storm.trident.operation.BaseFunction;
> import storm.trident.operation.TridentCollector;
> import storm.trident.tuple.TridentTuple;
> import backtype.storm.tuple.Values;
>
> class ThirdFunction extends BaseFunction {
>
> @Override
> public void execute(TridentTuple tuple, TridentCollector collector) {
> String requestId = tuple.getStringByField("RequestId");
> collector.emit(new Values("ColumnMapId_1", "Value3"));
> collector.emit(new Values("ColumnMapId_3", "Value3"));
> }
> }
>
> ===========================
>
> package com.trident.fork.joins.test;
> /**
>  * @author dkirankumar
>  */
> import storm.trident.operation.BaseFilter;
> import storm.trident.tuple.TridentTuple;
>
> class PrintFilter extends BaseFilter {
>
> private String name;
>
> public PrintFilter(String name) {
> super();
> this.name = name;
> }
>
> @Override
> public boolean isKeep(TridentTuple tuple) {
> System.out.println("## PrintFilter [" + name + "]: " + tuple.getValues());
> return true;
> }
>
> }
>
> =======================
>

Reply via email to