package testkafkastorm;

import java.util.ArrayList;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;

import storm.kafka.BrokerHosts;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;
import storm.kafka.trident.TridentKafkaStateFactory;
import storm.kafka.trident.mapper.FieldNameBasedTupleToKafkaMapper;
import storm.kafka.trident.selector.DefaultTopicSelector;
import storm.trident.Stream;
import storm.trident.TridentTopology;
import storm.trident.testing.FixedBatchSpout;
import backtype.storm.Config;
import backtype.storm.StormSubmitter;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

 class PrintBolt extends BaseRichBolt{
	
	private OutputCollector collector;
	
	ArrayList<String> words = new ArrayList<String>();
	
	
	 public void prepare(
	        Map                     map,
	        TopologyContext         topologyContext,
	        OutputCollector         outputCollector)
	    { }
	 
	 
	 public void execute(Tuple tuple)
	    {
	      String word = tuple.getString(0);
	      words.add(word);
	      System.out.println(word);
	    }
	 
	 public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer)
	    {
	      //outputFieldsDeclarer.declare(new Fields("word"));
	    }
	 
	 @Override
	 public void cleanup() {
		    System.out.println("--- FINAL COUNTS ---");
		    
		    for (String key : words) {
		      System.out.println(key + " : " + (key));
		    }
		    System.out.println("--------------");    
		    
	 }
		  
}

public class TestKafkaStorm {
	
	String zkConnect, topic, topologyname;

	public void runTopologyLocally() throws Exception{
		String zkRoot = "/kafka-root"; 
		String zkSpoutId = "kafka-storm-starter";
		//ZkHosts zkHosts = new ZkHosts(zkConnect);
		
		BrokerHosts hosts = new ZkHosts(zkConnect);
		
		SpoutConfig kafkaConfig = new SpoutConfig(new ZkHosts(zkConnect),topic, "/"+topic,UUID.randomUUID().toString());
		
		//KafkaConfig kafkaConfig = new KafkaConfig(hosts,topic);
		kafkaConfig.startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
		
		kafkaConfig.scheme= new SchemeAsMultiScheme(new StringScheme());
		
		System.out.println("in runTopologyLocally");
		
		KafkaSpout kafkaSpout = new KafkaSpout(kafkaConfig);
		TopologyBuilder builder = new TopologyBuilder();
		
		System.out.println("creating kafka spout");
		builder.setSpout("kafka-spout",kafkaSpout,2);
		
		System.out.println("set kafka spout");
		builder.setBolt("print-bolt", new PrintBolt(),2).globalGrouping("kafka-spout");
		
		//.fieldsGrouping("kafka-spout", new Fields("word"));
		System.out.println("set print bolt");
		Config config = new Config();
		config.setDebug(true);
		//config.setNumWorkers(3);
		System.out.println("submitting the topology: " + topologyname);
		StormSubmitter.submitTopology(topologyname, config, builder.createTopology());
		System.out.println("topology hasbeen submitted sucessfully");
	}
	
	public TestKafkaStorm(String zkConnect, String topic, String topologyname){
		this.zkConnect = zkConnect; 
		this.topic = topic;
		this.topologyname = topologyname;		
	}
	
	public static void main(String[] args) throws Exception {
		// TODO Auto-generated method stub
		
		TestKafkaStorm tks = new TestKafkaStorm(args[0], args[1], args[2]);
		System.out.println("calling runTopologyLocally");
		tks.runTopologyLocally();

	}

}
