package com.trident.fork.joins.test;
/**
 * @author dkirankumar
 */
import java.util.ArrayList;
import java.util.List;

import storm.trident.JoinType;
import storm.trident.Stream;
import storm.trident.TridentTopology;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.tuple.Fields;

public class Topology {

	public static void main(String[] args) {
		TridentTopology topology = new TridentTopology();

		Stream stream = topology.newStream("spout", new Spout())
		.each(new Fields("RequestId"), new PrintFilter("SpoutOutput"));
		
		// Forking the request to two different functions..
		Stream firstFunctionStream = stream
				.each(new Fields("RequestId"), new FirstFunction(), new Fields("ColumnMapId", "FFValue"))
				.each(new Fields("RequestId", "ColumnMapId", "FFValue"), new PrintFilter("FirstFunctionOutput"));
		
		Stream secondFunctionStream = stream
				.each(new Fields("RequestId"), new SecondFunction(), new Fields("ColumnMapId", "SFValue"))
				.each(new Fields("RequestId", "ColumnMapId", "SFValue"), new PrintFilter("SecondFunctionOutput"));
		
		Stream thirdFunctionStream = stream
				.each(new Fields("RequestId"), new ThirdFunction(), new Fields("ColumnMapId", "TFValue"))
				.each(new Fields("RequestId", "ColumnMapId", "TFValue"), new PrintFilter("ThirdFunctionOutput"));
		
		// Joining two streams from two different functions..
		List<Stream> streams = new ArrayList<Stream>();
		streams.add(firstFunctionStream);
		streams.add(secondFunctionStream);
		streams.add(thirdFunctionStream);
		
		List<Fields> joinFields = new ArrayList<Fields>();
		joinFields.add(new Fields("RequestId", "ColumnMapId"));
		joinFields.add(new Fields("RequestId", "ColumnMapId"));
		joinFields.add(new Fields("RequestId", "ColumnMapId"));
		
		topology.join(streams, joinFields, 
				new Fields("RequestId", "ColumnMapId", "FFValue", "SFValue", "TFValue"), 
				JoinType.mixed(JoinType.INNER, JoinType.OUTER)
				)
		.each(new Fields("RequestId", "ColumnMapId", "FFValue", "SFValue", "TFValue"), new PrintFilter("JoinedOutput"));
		
		LocalCluster cluster = new LocalCluster();
		cluster.submitTopology("MultipleJoinsTest", new Config(), topology.build());
	}
}
