Below is the test code i am trying to do left outer join on multiple streams..

The issue i am getting is something like..
RuntimeException: Expecting 4 lists instead getting 3 lists.

FYI: This works fine for InnerJoin, but failing with the above exception when i 
am trying for Left Outer Join.

==========================

package com.trident.fork.joins.test;
/**
 * @author dkirankumar
 */
import java.util.ArrayList;
import java.util.List;

import storm.trident.JoinType;
import storm.trident.Stream;
import storm.trident.TridentTopology;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.tuple.Fields;

public class Topology {

public static void main(String[] args) {
TridentTopology topology = new TridentTopology();

Stream stream = topology.newStream("spout", new Spout())
.each(new Fields("RequestId"), new PrintFilter("SpoutOutput"));
// Forking the request to two different functions..
Stream firstFunctionStream = stream
.each(new Fields("RequestId"), new FirstFunction(), new Fields("ColumnMapId", 
"FFValue"))
.each(new Fields("RequestId", "ColumnMapId", "FFValue"), new 
PrintFilter("FirstFunctionOutput"));
Stream secondFunctionStream = stream
.each(new Fields("RequestId"), new SecondFunction(), new Fields("ColumnMapId", 
"SFValue"))
.each(new Fields("RequestId", "ColumnMapId", "SFValue"), new 
PrintFilter("SecondFunctionOutput"));
Stream thirdFunctionStream = stream
.each(new Fields("RequestId"), new ThirdFunction(), new Fields("ColumnMapId", 
"TFValue"))
.each(new Fields("RequestId", "ColumnMapId", "TFValue"), new 
PrintFilter("ThirdFunctionOutput"));
// Joining two streams from two different functions..
List<Stream> streams = new ArrayList<Stream>();
streams.add(firstFunctionStream);
streams.add(secondFunctionStream);
streams.add(thirdFunctionStream);
List<Fields> joinFields = new ArrayList<Fields>();
joinFields.add(new Fields("RequestId", "ColumnMapId"));
joinFields.add(new Fields("RequestId", "ColumnMapId"));
joinFields.add(new Fields("RequestId", "ColumnMapId"));
topology.join(streams, joinFields, 
new Fields("RequestId", "ColumnMapId", "FFValue", "SFValue", "TFValue"), 
JoinType.mixed(JoinType.INNER, JoinType.OUTER)
)
.each(new Fields("RequestId", "ColumnMapId", "FFValue", "SFValue", "TFValue"), 
new PrintFilter("JoinedOutput"));
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("MultipleJoinsTest", new Config(), topology.build());
}
}


==========================


package com.trident.fork.joins.test;
/**
 * @author dkirankumar
 */
import java.util.Map;

import redis.clients.jedis.JedisPool;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;

public class Spout extends BaseRichSpout {

static final long serialVersionUID = 737015318988609460L;

SpoutOutputCollector _collector;
JedisPool pool;

public void open(Map conf, TopologyContext context, SpoutOutputCollector 
collector) {
_collector = collector;
}

public void close() {
}

int i = 1;

public void nextTuple() {
if(i > 1){
Utils.sleep(3 * 1000);
return;
}
String requestId = "RequestId_" + i;
_collector.emit(new Values(requestId));
i++;
}

public void ack(Object msgId) {

}

public void fail(Object msgId) {

}

public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("RequestId"));
}

public boolean isDistributed() {
return false;
}
}

========================

package com.trident.fork.joins.test;

/**
 * @author dkirankumar
 */
import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;
import backtype.storm.tuple.Values;

class FirstFunction extends BaseFunction {

@Override
public void execute(TridentTuple tuple, TridentCollector collector) {
String requestId = tuple.getStringByField("RequestId");
collector.emit(new Values("ColumnMapId_1", "Value1"));
collector.emit(new Values("ColumnMapId_2", "Value1"));
collector.emit(new Values("ColumnMapId_3", "Value1"));
collector.emit(new Values("ColumnMapId_4", "Value1"));
}
}

=========================

package com.trident.fork.joins.test;

/**
 * @author dkirankumar
 */
import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;
import backtype.storm.tuple.Values;

class SecondFunction extends BaseFunction {

@Override
public void execute(TridentTuple tuple, TridentCollector collector) {
String requestId = tuple.getStringByField("RequestId");
collector.emit(new Values("ColumnMapId_1", "Value2"));
collector.emit(new Values("ColumnMapId_2", "Value2"));
}
}

===========================

package com.trident.fork.joins.test;

/**
 * @author dkirankumar
 */
import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;
import backtype.storm.tuple.Values;

class ThirdFunction extends BaseFunction {

@Override
public void execute(TridentTuple tuple, TridentCollector collector) {
String requestId = tuple.getStringByField("RequestId");
collector.emit(new Values("ColumnMapId_1", "Value3"));
collector.emit(new Values("ColumnMapId_3", "Value3"));
}
}

===========================

package com.trident.fork.joins.test;
/**
 * @author dkirankumar
 */
import storm.trident.operation.BaseFilter;
import storm.trident.tuple.TridentTuple;

class PrintFilter extends BaseFilter {

private String name;

public PrintFilter(String name) {
super();
this.name = name;
}

@Override
public boolean isKeep(TridentTuple tuple) {
System.out.println("## PrintFilter [" + name + "]: " + tuple.getValues());
return true;
}

}

=======================

Reply via email to