package streamtest;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import org.apache.avro.generic.GenericRecord;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;

public class DynamicGenericRecordSerializerSnapshot implements TypeSerializerSnapshot<GenericRecord> {
	
	// Store only the endpoint and related parameters, not the actual schema cache
	private SchemaRegistryClientConfig config;

    // Required empty constructor for deserialization
    public DynamicGenericRecordSerializerSnapshot() {}

    public DynamicGenericRecordSerializerSnapshot(SchemaRegistryClientConfig config) {
        this.config = config;
    }

    @Override
    public int getCurrentVersion() {
        return 1;
    }

    @Override
    public void writeSnapshot(DataOutputView out) throws IOException {
        out.writeUTF(config.getSrBaseUrl());
        out.writeInt(config.getSrMaxRetries());
        writeMap(out, config.getRegistryConfigs());
        writeMap(out, config.getIamSSLConfigs());
        writeMap(out, config.getIamConfigsForToken());
        
        System.out.println("writeSnapshot is called");
    }

    @Override
    public void readSnapshot(int version, DataInputView in, ClassLoader userCodeClassLoader) throws IOException {
        String baseUrl = in.readUTF();
        int maxRetries = in.readInt();
        Map<String, String> registryConfigs = readMap(in);
        Map<String, String> iamSSLConfigs = readMap(in);
        Map<String, String> iamConfigs = readMap(in);

        this.config = new SchemaRegistryClientConfig(baseUrl, maxRetries, registryConfigs, iamSSLConfigs, iamConfigs);
        
        System.out.println("readSnapshot is called" + config);
    }

    @Override
    public TypeSerializer<GenericRecord> restoreSerializer() {
                
        System.out.println("restoreSerializer is called" + config);
        return new DynamicGenericRecordSerializer(config);
    }

    
    private void writeMap(DataOutputView out, Map<String, String> map) throws IOException {
        out.writeInt(map.size());
        for (Map.Entry<String, String> entry : map.entrySet()) {
            out.writeUTF(entry.getKey());
            out.writeUTF(entry.getValue());
        }
    }

    private Map<String, String> readMap(DataInputView in) throws IOException {
        int size = in.readInt();
        Map<String, String> map = new HashMap<>();
        for (int i = 0; i < size; i++) {
            map.put(in.readUTF(), in.readUTF());
        }
        return map;
    }

    @Override
    public TypeSerializerSchemaCompatibility<GenericRecord> resolveSchemaCompatibility(TypeSerializer<GenericRecord> newSerializer) {
        return TypeSerializerSchemaCompatibility.compatibleAsIs();
    }
}

