package streamtest;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.EncoderFactory;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;

public class DynamicGenericRecordSerializer extends TypeSerializer<GenericRecord> {

	@Override
	public boolean isImmutableType() {
		return false;
	}

	@Override
	public TypeSerializer<GenericRecord> duplicate() {
		return this;
	}

	@Override
	public GenericRecord createInstance() {
		return null;
	}

	@Override
	public GenericRecord copy(GenericRecord from) {
		return from; // Avro records are mutable — optionally implement deep copy
	}

	@Override
	public GenericRecord copy(GenericRecord from, GenericRecord reuse) {
		return copy(from);
	}

	@Override
	public int getLength() {
		return -1; // variable size
	}

	@Override
	public void serialize(GenericRecord record, DataOutputView target) throws IOException {
		System.out.println("serialize method is called");
		// Serialize schema as string

		String schemaStr = record.getSchema().toString();
		byte[] schemaBytes = schemaStr.getBytes(StandardCharsets.UTF_8);
		target.writeInt(schemaBytes.length);
		target.write(schemaBytes);

		// Serialize record
		ByteArrayOutputStream out = new ByteArrayOutputStream();
		DatumWriter<GenericRecord> writer = new GenericDatumWriter<>(record.getSchema());
		BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null);
		writer.write(record, encoder);
		encoder.flush();
		byte[] recordBytes = out.toByteArray();

		target.writeInt(recordBytes.length);
		target.write(recordBytes);
	}

	@Override
	public GenericRecord deserialize(DataInputView source) throws IOException {
		System.out.println("deserialize method is called");
		// Read schema

		int schemaLength = source.readInt();
		byte[] schemaBytes = new byte[schemaLength];
		source.readFully(schemaBytes);
		String schemaStr = new String(schemaBytes, StandardCharsets.UTF_8);
		Schema schema = new Schema.Parser().parse(schemaStr);

		// Read record
		int recordLength = source.readInt();
		byte[] recordBytes = new byte[recordLength];
		source.readFully(recordBytes);

		DatumReader<GenericRecord> reader = new GenericDatumReader<>(schema);
		BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(recordBytes, null);
		return reader.read(null, decoder);

	}

	@Override
	public GenericRecord deserialize(GenericRecord reuse, DataInputView source) throws IOException {
		return deserialize(source); // no reuse
	}

	@Override
	public void copy(DataInputView source, DataOutputView target) throws IOException {
		GenericRecord record = deserialize(source);
		serialize(record, target);
	}

	@Override
	public boolean equals(Object obj) {
		return obj instanceof DynamicGenericRecordSerializer;
	}

	@Override
	public int hashCode() {
		return 42; // arbitrary constant
	}

	@Override
	public TypeSerializerSnapshot<GenericRecord> snapshotConfiguration() {
		return new DynamicGenericRecordSerializerSnapshot();
	}
}
