package net.juniper.cs.cache;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteException;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.stream.StreamAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
import kafka.serializer.Decoder;
import com.test.cs.entity.Person;

public class KafkaCacheDataStreamer<T, K, V> extends StreamAdapter<T, K, V> {
    
	private static Logger log = LoggerFactory.getLogger(KafkaCacheDataStreamer.class);
	
	/** Retry timeout. */
    private static final long DFLT_RETRY_TIMEOUT = 10000;

    /** Executor used to submit kafka streams. */
    private ExecutorService executor;

    /** Topic. */
    private String topic;

    /** Number of threads to process kafka streams. */
    private int threads;

    /** Kafka consumer config. */
    private ConsumerConfig consumerCfg;

    /** Key decoder. */
    private Decoder<T> keyDecoder;

    /** Value decoder. */
    private Decoder<T> valDecoder;

    /** Kafka consumer connector. */
    private ConsumerConnector consumer;

    /** Retry timeout. */
    private long retryTimeout = DFLT_RETRY_TIMEOUT;

    /** Stopped. */
    private volatile boolean stopped;

    private IgniteCache<String, Person> cache = CacheManager.getPersonCache();
    
    /**
     * Sets the topic name.
     *
     * @param topic Topic name.
     */
    public void setTopic(String topic) {
        this.topic = topic;
    }

    /**
     * Sets the threads.
     *
     * @param threads Number of threads.
     */
    public void setThreads(int threads) {
        this.threads = threads;
    }

    /**
     * Sets the consumer config.
     *
     * @param consumerCfg Consumer configuration.
     */
    public void setConsumerConfig(ConsumerConfig consumerCfg) {
        this.consumerCfg = consumerCfg;
    }

    /**
     * Sets the key decoder.
     *
     * @param keyDecoder Key decoder.
     */
    public void setKeyDecoder(Decoder<T> keyDecoder) {
        this.keyDecoder = keyDecoder;
    }

    /**
     * Sets the value decoder.
     *
     * @param valDecoder Value decoder.
     */
    public void setValueDecoder(Decoder<T> valDecoder) {
        this.valDecoder = valDecoder;
    }

    /**
     * Sets the retry timeout.
     *
     * @param retryTimeout Retry timeout.
     */
    public void setRetryTimeout(long retryTimeout) {
    	if (retryTimeout <= 0){
    		throw new IllegalArgumentException("retryTimeout mist be greater than 0");
    		
    	}
        this.retryTimeout = retryTimeout;
    }

    
    
    @Override
    protected void addMessage(T msg) {
    	try {
    		if (getMultipleTupleExtractor() == null){
                Map.Entry<K, V> e = getSingleTupleExtractor().extract(msg);

                if (e != null)
                    getStreamer().addData(e);

            } else {
                Map<K, V> m = getMultipleTupleExtractor().extract(msg);
                if (m != null && !m.isEmpty()){
                	/*for (Map.Entry<K, V> entry : m.entrySet()){
                		getStreamer().addData(entry.getKey(), entry.getValue());
//                		cache.put((String)entry.getKey(), (Person) entry.getValue());
                	}*/
                	
                	getStreamer().addData(m);

                }
            }
	
    	}catch(Exception ex){
    		log.error("Exception while adding to streamer ", ex);
    	}
    }
    /**
     * Starts streamer.
     *
     * @throws IgniteException If failed.
     */
    public void start() {
        Objects.requireNonNull(getStreamer(), "streamer");
        Objects.requireNonNull(getIgnite(), "ignite");
        Objects.requireNonNull(topic, "topic");
        Objects.requireNonNull(keyDecoder, "key decoder");
        Objects.requireNonNull(valDecoder, "value decoder");
        
        Objects.requireNonNull(consumerCfg, "kafka consumer config");
        if(threads <= 0){
        	throw new IllegalArgumentException("Threads cannot be zero");
        }

        
        
        consumer = kafka.consumer.Consumer.createJavaConsumerConnector(consumerCfg);

        Map<String, Integer> topicCntMap = new HashMap<>();

        topicCntMap.put(topic, threads);

        Map<String, List<KafkaStream<T, T>>> consumerMap =
            consumer.createMessageStreams(topicCntMap, keyDecoder, valDecoder);

        List<KafkaStream<T, T>> streams = consumerMap.get(topic);
        
        log.info("Kafka is connected successfully");        
        // Now launch all the consumer threads.
        executor = Executors.newFixedThreadPool(threads);

        stopped = false;

        // Now create an object to consume the messages.
        for (final KafkaStream<T, T> stream : streams) {
            executor.submit(new Runnable() {
                @Override public void run() {
                    while (!stopped) {
                        try {
                            for (ConsumerIterator<T, T> it = stream.iterator(); it.hasNext() && !stopped; ) {
                                MessageAndMetadata<T, T> msg = it.next();
                                log.info("Message recieved {} ", msg.message()); 
                                try {
                                	addMessage(msg.message());
                                }
                                catch (Exception e) {
                                    log.error("Message is ignored due to an error [msg=" + msg + ']', e);
                                }
                            }
                        }
                        catch (Exception e) {
                            log.error("Message can't be consumed from stream. Retry after " + retryTimeout + " ms.", e);

                            try {
                                Thread.sleep(retryTimeout);
                            }
                            catch (InterruptedException ie) {
                                // No-op.
                            }
                        }
                    }
                }
            });
        }
    }

    /**
     * Stops streamer.
     */
    public void stop() {
        stopped = true;

        if (consumer != null)
            consumer.shutdown();

        if (executor != null) {
            executor.shutdown();

            try {
                if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS))
                    if (log.isDebugEnabled())
                        log.debug("Timed out waiting for consumer threads to shut down, exiting uncleanly.");
            }
            catch (InterruptedException e) {
                if (log.isDebugEnabled())
                    log.debug("Interrupted during shutdown, exiting uncleanly.");
            }
        }
    }

}
