package testTCPStream;

import java.util.HashMap;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import com.ericsson.bss.mm1.Cause__3;
import com.ericsson.bss.mm1.SessionId;
import com.fasterxml.jackson.databind.BeanDescription;
import com.fasterxml.jackson.databind.DeserializationConfig;
import com.fasterxml.jackson.databind.JsonDeserializer;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.cfg.CoercionAction;
import com.fasterxml.jackson.databind.cfg.CoercionInputShape;
import com.fasterxml.jackson.databind.deser.BeanDeserializer;
import com.fasterxml.jackson.databind.deser.BeanDeserializerModifier;
import com.fasterxml.jackson.databind.json.JsonMapper;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.fasterxml.jackson.databind.type.LogicalType;

import ma.glasnost.orika.CustomMapper;
import ma.glasnost.orika.MapperFacade;
import ma.glasnost.orika.MapperFactory;
import ma.glasnost.orika.MappingContext;
import ma.glasnost.orika.converter.ConverterFactory;
import ma.glasnost.orika.impl.DefaultMapperFactory;

public class FlinkClientApp {

	public static void main(String[] args) throws Exception {

		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		// env.disableOperatorChaining();

		HashMapStateBackend embedded = new HashMapStateBackend();
		env.setStateBackend(embedded);

		//env.getCheckpointConfig().setCheckpointStorage("file:///mnt/c/Users/ezmitka/checkpointDir");
		env.getCheckpointConfig().setCheckpointInterval(10000);
		//env.getCheckpointConfig()
				//.setCheckpointStorage(new FileSystemCheckpointStorage("file:///mnt/c/Users/ezmitka/checkpointDir"));
		env.getCheckpointConfig()
				.setExternalizedCheckpointCleanup(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
		 
		//env.getCheckpointConfig().enableUnalignedCheckpoints();
		
		env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(5, 5));

		DataStreamSource<String> socketStockStream = env
				.addSource(new ServerCustomSocketStreamFunction(args[0], Integer.parseInt(args[1]), 5, 5000,
						Integer.parseInt(args[2])))
				.setParallelism(2);
		
		DataStream text = socketStockStream.map(new DataConverter()).setParallelism(1).rebalance();

		DataStream textAvro = text.map(new DataAvroConverter())
				.setParallelism(2).rebalance();

		textAvro.print().setParallelism(2);

		env.execute("ebm record stream");
	}

	public static final class DataConverter<T> implements MapFunction<String, T>, ResultTypeQueryable {

		private static final long serialVersionUID = 1L;

		private static ObjectMapper mapper = JsonMapper.builder()
			    //.configure(MapperFeature.ACCEPT_CASE_INSENSITIVE_PROPERTIES, true)
			    .build();
		private static SimpleModule validationModule = new SimpleModule();
		Class recordClazz;

		@Override
		public TypeInformation getProducedType() {
			return TypeExtractor.getForClass(this.recordClazz);
		}

		public DataConverter() throws ClassNotFoundException {
			ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
			this.recordClazz = Class.forName(com.ericsson.bss.mm1.EbmRecordSchema.class.getCanonicalName(), true,
					classLoader);
		}

		static {
			validationModule.setDeserializerModifier(new BeanDeserializerModifier() {

				@Override
				public JsonDeserializer<?> modifyDeserializer(DeserializationConfig config, BeanDescription beanDesc,
						JsonDeserializer<?> deserializer) {
					if (deserializer instanceof BeanDeserializer) {
						return new BeanValidationDeserializer((BeanDeserializer) deserializer);
					}

					return deserializer;
				}
			});
			mapper.registerModule(validationModule);
			mapper.coercionConfigFor(LogicalType.Collection).setCoercion(CoercionInputShape.EmptyString,
					CoercionAction.AsNull);
			//mapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
			//mapper.configure(DeserializationFeature.FAIL_ON_UNRESOLVED_OBJECT_IDS, true);
		}

		@Override
		public T map(String arg0) {

			T ebmRecordSchema = null;
		
			try {
				ebmRecordSchema = (T) mapper.readValue(arg0.trim(), this.recordClazz);
			
			} catch (Exception e) {
				e.printStackTrace();
			}

			return ebmRecordSchema;

		}
	}

	public static final class DataAvroConverter<E, T> implements MapFunction<T, E>, ResultTypeQueryable {

		private static final long serialVersionUID = 1L;

		private static MapperFactory mapperFactory = new DefaultMapperFactory.Builder().mapNulls(false).build();
		private static ConverterFactory converterFactory = mapperFactory.getConverterFactory();
		private static MapperFacade mapperFacade = null;
		Class recordClazz;

		public DataAvroConverter() throws ClassNotFoundException {
			ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
			this.recordClazz = Class.forName(com.ericsson.bss.mm1.avro.EbmRecordSchema.class.getCanonicalName(), true,
					classLoader);
		}

		static {
			// custom code based on need to be written in editor window
			customFieldsMapping(mapperFactory);

			converterFactory.registerConverter(new CustomEnumConverter());
			mapperFacade = mapperFactory.getMapperFacade();
		}

		@Override
		public E map(T arg0) {

			// com.ericsson.bss.mm1.avro.EbmRecordSchema dest = null;
			E dest = null;
			try {
				dest = (E) mapperFacade.map(arg0, this.recordClazz);
		
			} catch (Exception e) {
				e.printStackTrace();
			}

			return dest;

		}

		@Override
		public TypeInformation getProducedType() {
			return TypeExtractor.getForClass(this.recordClazz);
		}
	}

	private static void customFieldsMapping(MapperFactory mapperFactory) {
		mapperFactory.classMap(SessionId.class, com.ericsson.bss.mm1.avro.SessionId.class)
				.fieldAToB("interface", "_interface")
				.byDefault().register();
		
		mapperFactory.classMap(Cause__3.class, com.ericsson.bss.mm1.avro.Cause__3.class)
		.customize(new CustomMapper<Cause__3, com.ericsson.bss.mm1.avro.Cause__3>() {

			@Override
			public void mapAtoB(Cause__3 from, com.ericsson.bss.mm1.avro.Cause__3 to, MappingContext context) {
				if (null != from) {
					for (String key : from.getAdditionalProperties().keySet()) {
						to.getAdditionalProperties().put(key, from.getAdditionalProperties().get(key));
					}
				} else {
					to.setAdditionalProperties(new HashMap<>());
				}
			}
		}).byDefault().register();
		

	}
}
