package com.mediav.rtlj.topology;

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.mediav.rtlj.util.GZipUtils;
import org.apache.thrift.TBase;
import org.apache.thrift.TDeserializer;
import org.apache.thrift.TException;
import org.apache.thrift.TSerializer;
import org.apache.thrift.protocol.TCompactProtocol;
import org.slf4j.Logger;

/**
 * @author zhangyf
 * @version 1.0 2016-02-19
 */

public class GZipThriftSerializer<T extends TBase> extends Serializer<T> {

  private static final int EMPTY = 0;

  // add DELTA to make length positive due to kryo internal optimization
  private static final int DELTA = 1;

  private static final Logger LOG = com.mediav.utils.Loggers.get();

  private static final ThreadLocal<TSerializer> COMPACT_T_SERIALIZER = new ThreadLocal<TSerializer>() {
    @Override
    protected TSerializer initialValue() {
      return new TSerializer(new TCompactProtocol.Factory());
    }
  };

  private static final ThreadLocal<TDeserializer> COMPACT_T_DESERIALIZER = new ThreadLocal<TDeserializer>() {
    @Override
    protected TDeserializer initialValue() {
      return new TDeserializer(new TCompactProtocol.Factory());
    }
  };

  {
    setAcceptsNull(true);
  }

  @Override
  public void write(Kryo kryo, Output output, T t) {
    if (t == null) {
      output.writeInt(EMPTY + DELTA, true);
      return;
    }
    try {
      byte[] data = COMPACT_T_SERIALIZER.get().serialize(t);
      byte[] compressed = GZipUtils.zip(data);
      output.writeInt(compressed.length + DELTA, true);
      output.writeBytes(compressed);
    } catch (TException e) {
      LOG.error("TException during serialization", e);
      output.writeInt(EMPTY + DELTA, true);
    }
  }

  @Override
  public T read(Kryo kryo, Input input, Class<T> aClass) {
    int length = input.readInt(true) - DELTA;
    if (length == 0) {
      return null;
    }
    try {
      T t = aClass.newInstance();
      COMPACT_T_DESERIALIZER.get().deserialize(t, GZipUtils.unzip(input.readBytes(length)));
      return t;
    } catch (InstantiationException e) {
      throw new RuntimeException(e);
    } catch (IllegalAccessException e) {
      throw new RuntimeException(e);
    } catch (TException e) {
      LOG.error("TException during deserialization", e);
    }
    return null;
  }
}
