package com.eka.test;

import java.util.AbstractMap;
import java.util.Properties;

import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.Ignition;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.stream.kafka.KafkaStreamer;

import kafka.consumer.ConsumerConfig;

/**
 * Hello world!
 *
 */
public class App {
	public static void main(String[] args) {

		IgniteConfiguration cfg = new IgniteConfiguration();
		Ignite ignite = Ignition.start(cfg);
		KafkaStreamer<String, String> kafkaStreamer = new KafkaStreamer<>();

		try {
			// Create the cache.
			IgniteCache<String, String> cache = ignite.getOrCreateCache(new CacheConfiguration<String,String>("IKAFKA"));

			IgniteDataStreamer<String, String> stmr = ignite.dataStreamer("IKAFKA");
			// allow overwriting cache data
			stmr.allowOverwrite(true);

			kafkaStreamer.setIgnite(ignite);
			kafkaStreamer.setStreamer(stmr);

			// set the topic
			kafkaStreamer.setTopic("ignite-kafka");

			// set the number of threads to process Kafka streams
			kafkaStreamer.setThreads(1);

			// set Kafka consumer configurations
			Properties props = new Properties();
			props.put("bootstrap.servers", "192.168.1.66:9092");
			props.put("zookeeper.connect", "192.168.1.66:2181");
			props.put("group.id", "test");
			props.put("enable.auto.commit", "true");
			props.put("auto.commit.interval.ms", "1000");
			props.put("session.timeout.ms", "30000");
			kafkaStreamer.setConsumerConfig(new ConsumerConfig(props));

			kafkaStreamer.setSingleTupleExtractor(msg -> {
				return new AbstractMap.SimpleEntry<String, String>(new String(msg.key()), new String(msg.message()));
			});

			kafkaStreamer.start();

			System.out.println(ignite.cache("IKAFKA"));

		} finally {
			// kafkaStreamer.stop();
			System.out.println("tested sink connect");
		}
	}
}
