Below is the test code i am trying to do left outer join on multiple streams..
The issue i am getting is something like..
RuntimeException: Expecting 4 lists instead getting 3 lists.
FYI: This works fine for InnerJoin, but failing with the above exception when i
am trying for Left Outer Join.
==========================
package com.trident.fork.joins.test;
/**
* @author dkirankumar
*/
import java.util.ArrayList;
import java.util.List;
import storm.trident.JoinType;
import storm.trident.Stream;
import storm.trident.TridentTopology;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.tuple.Fields;
public class Topology {
public static void main(String[] args) {
TridentTopology topology = new TridentTopology();
Stream stream = topology.newStream("spout", new Spout())
.each(new Fields("RequestId"), new PrintFilter("SpoutOutput"));
// Forking the request to two different functions..
Stream firstFunctionStream = stream
.each(new Fields("RequestId"), new FirstFunction(), new Fields("ColumnMapId",
"FFValue"))
.each(new Fields("RequestId", "ColumnMapId", "FFValue"), new
PrintFilter("FirstFunctionOutput"));
Stream secondFunctionStream = stream
.each(new Fields("RequestId"), new SecondFunction(), new Fields("ColumnMapId",
"SFValue"))
.each(new Fields("RequestId", "ColumnMapId", "SFValue"), new
PrintFilter("SecondFunctionOutput"));
Stream thirdFunctionStream = stream
.each(new Fields("RequestId"), new ThirdFunction(), new Fields("ColumnMapId",
"TFValue"))
.each(new Fields("RequestId", "ColumnMapId", "TFValue"), new
PrintFilter("ThirdFunctionOutput"));
// Joining two streams from two different functions..
List<Stream> streams = new ArrayList<Stream>();
streams.add(firstFunctionStream);
streams.add(secondFunctionStream);
streams.add(thirdFunctionStream);
List<Fields> joinFields = new ArrayList<Fields>();
joinFields.add(new Fields("RequestId", "ColumnMapId"));
joinFields.add(new Fields("RequestId", "ColumnMapId"));
joinFields.add(new Fields("RequestId", "ColumnMapId"));
topology.join(streams, joinFields,
new Fields("RequestId", "ColumnMapId", "FFValue", "SFValue", "TFValue"),
JoinType.mixed(JoinType.INNER, JoinType.OUTER)
)
.each(new Fields("RequestId", "ColumnMapId", "FFValue", "SFValue", "TFValue"),
new PrintFilter("JoinedOutput"));
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("MultipleJoinsTest", new Config(), topology.build());
}
}
==========================
package com.trident.fork.joins.test;
/**
* @author dkirankumar
*/
import java.util.Map;
import redis.clients.jedis.JedisPool;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
public class Spout extends BaseRichSpout {
static final long serialVersionUID = 737015318988609460L;
SpoutOutputCollector _collector;
JedisPool pool;
public void open(Map conf, TopologyContext context, SpoutOutputCollector
collector) {
_collector = collector;
}
public void close() {
}
int i = 1;
public void nextTuple() {
if(i > 1){
Utils.sleep(3 * 1000);
return;
}
String requestId = "RequestId_" + i;
_collector.emit(new Values(requestId));
i++;
}
public void ack(Object msgId) {
}
public void fail(Object msgId) {
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("RequestId"));
}
public boolean isDistributed() {
return false;
}
}
========================
package com.trident.fork.joins.test;
/**
* @author dkirankumar
*/
import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;
import backtype.storm.tuple.Values;
class FirstFunction extends BaseFunction {
@Override
public void execute(TridentTuple tuple, TridentCollector collector) {
String requestId = tuple.getStringByField("RequestId");
collector.emit(new Values("ColumnMapId_1", "Value1"));
collector.emit(new Values("ColumnMapId_2", "Value1"));
collector.emit(new Values("ColumnMapId_3", "Value1"));
collector.emit(new Values("ColumnMapId_4", "Value1"));
}
}
=========================
package com.trident.fork.joins.test;
/**
* @author dkirankumar
*/
import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;
import backtype.storm.tuple.Values;
class SecondFunction extends BaseFunction {
@Override
public void execute(TridentTuple tuple, TridentCollector collector) {
String requestId = tuple.getStringByField("RequestId");
collector.emit(new Values("ColumnMapId_1", "Value2"));
collector.emit(new Values("ColumnMapId_2", "Value2"));
}
}
===========================
package com.trident.fork.joins.test;
/**
* @author dkirankumar
*/
import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;
import backtype.storm.tuple.Values;
class ThirdFunction extends BaseFunction {
@Override
public void execute(TridentTuple tuple, TridentCollector collector) {
String requestId = tuple.getStringByField("RequestId");
collector.emit(new Values("ColumnMapId_1", "Value3"));
collector.emit(new Values("ColumnMapId_3", "Value3"));
}
}
===========================
package com.trident.fork.joins.test;
/**
* @author dkirankumar
*/
import storm.trident.operation.BaseFilter;
import storm.trident.tuple.TridentTuple;
class PrintFilter extends BaseFilter {
private String name;
public PrintFilter(String name) {
super();
this.name = name;
}
@Override
public boolean isKeep(TridentTuple tuple) {
System.out.println("## PrintFilter [" + name + "]: " + tuple.getValues());
return true;
}
}
=======================