package com.trident.fork.joins.test;
/**
 * @author dkirankumar
 */
import java.util.Map;

import redis.clients.jedis.JedisPool;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;

public class Spout extends BaseRichSpout {

	static final long serialVersionUID = 737015318988609460L;

	SpoutOutputCollector _collector;
	JedisPool pool;

	public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
		_collector = collector;
	}

	public void close() {
	}

	int i = 1;

	public void nextTuple() {
		if(i > 1){
			Utils.sleep(3 * 1000);
			return;
		}
		
		String requestId = "RequestId_" + i;
		_collector.emit(new Values(requestId));
		i++;
	}

	public void ack(Object msgId) {

	}

	public void fail(Object msgId) {

	}

	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("RequestId"));
	}

	public boolean isDistributed() {
		return false;
	}
}
