package com.pub.avro;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import junit.framework.TestCase;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericContainer;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.JsonDecoder;
import org.apache.avro.io.JsonEncoder;
import org.junit.Before;
import org.junit.Test;


public class TestEvolvedSchema2 extends TestCase {


//  {
//    "fields": [
//
//    {
//      "name": "things",
//        "type": {
//      "fields": [
//      {
//        "name": "thing",
//          "type": "int"
//      }
//      ],
//      "name": "ThingRecord",
//          "type": "record"
//    }
//    }
//    ],
//    "name": "ThingPersistenceRecord",
//      "type": "record"
//  }
  private static String V1_PERSISTENCE_SCHEMA = "{\n" + "    \"fields\": [\n" + "        \n" + "        {\n"
      + "            \"name\": \"things\", \n" + "            \"type\": {\n" + "                        \"fields\": [\n"
      + "                            {\n" + "                                \"name\": \"thing\", \n"
      + "                                \"type\": \"int\"\n" + "                            }\n"
      + "                        ], \n" + "                        \"name\": \"ThingRecord\", \n"
      + "                        \"type\": \"record\"\n" + "                    }\n" + "        }\n" + "    ], \n"
      + "    \"name\": \"ThingPersistenceRecord\", \n" + "    \"type\": \"record\"\n" + "}";


//  {
//    "fields": [
//
//    {
//      "name": "things",
//        "type": {
//      "fields": [
//      {
//        "name": "thing",
//          "type": "int"
//      },
//      {
//        "name": "superthing",
//          "type": ["null", "int"],
//        "default": null
//      }
//      ],
//      "name": "ThingRecord",
//          "type": "record"
//    }
//    }
//    ],
//    "name": "ThingPersistenceRecord",
//      "type": "record"
//  }
  private static String V2_PERSISTENCE_SCHEMA = "{\n" + "    \"fields\": [\n" + "        \n" + "        {\n"
      + "            \"name\": \"things\", \n" + "            \"type\": {\n" + "                        \"fields\": [\n"
      + "                            {\n" + "                                \"name\": \"thing\", \n"
      + "                                \"type\": \"int\"\n" + "                            }, \n"
      + "                            {\n" + "                                \"name\": \"superthing\", \n"
      + "                                \"type\": [\"null\", \"int\"],\n"
      + "                                \"default\": null\n" + "                            } \n"
      + "                        ], \n" + "                        \"name\": \"ThingRecord\", \n"
      + "                        \"type\": \"record\"\n" + "                    } \n" + "        }\n" + "    ], \n"
      + "    \"name\": \"ThingPersistenceRecord\", \n" + "    \"type\": \"record\"\n" + "}";



//  {
//    "name": "Message",
//      "type": "record",
//      "fields": [
//    {"name": "things", "type": {"type": "record",
//        "name": "ThingRecord",
//        "fields": [
//      {"name": "thing", "type": "int"}
//      ]
//    }
//    }
//    ]
//  }
  private static String V1_MESSAGE_SCHEMA = "{\n" + "  \"name\": \"Message\",\n" + "  \"type\": \"record\",\n"
      + "  \"fields\": [\n" + "    {\"name\": \"things\", \"type\": {\"type\": \"record\",\n"
      + "                 \"name\": \"ThingRecord\",\n" + "                 \"fields\": [\n"
      + "                    {\"name\": \"thing\", \"type\": \"int\"}\n" + "                 ]\n"
      + "                }\n" + "    }\n" + "  ]\n" + "}";



  GenericData.Record v1PersistenceRecord;
  GenericData.Record v2PersistenceRecord;
  GenericRecord v1Message;

  private static final Schema v1MessageSchema = Schema.parse(V1_MESSAGE_SCHEMA);
  private static final Schema v1PersistenceSchema = Schema.parse(V1_PERSISTENCE_SCHEMA);
  private static final Schema v2PersistenceSchema = Schema.parse(V2_PERSISTENCE_SCHEMA);

  @Before
  public void setUp()
      throws Exception {

    v1PersistenceRecord = new GenericData.Record(v1PersistenceSchema);
    v2PersistenceRecord = new GenericData.Record(v2PersistenceSchema);
    v1Message = jsonToGenericRecord("{\"things\":{\"thing\":8675309}}", v1MessageSchema);
  }

  @Test
  public void testAvroNormalUpdate() throws IOException {
    overwriteRecord(v1Message, v1PersistenceRecord);

    String json = genericRecordToJson(v1PersistenceRecord);
    assertNotNull(json);
  }

  @Test
  public void testAvroEvolvedUpdate() throws IOException {
    overwriteRecord(v1Message, v2PersistenceRecord);

    String json = genericRecordToJson(v2PersistenceRecord);
    assertNotNull(json);
  }

  @Test
  public void testAvroEvolvedValidate() throws IOException {
    overwriteRecord(v1Message, v2PersistenceRecord);

    try {
      validateFieldSchema(v1Message, v2PersistenceRecord, "things", v1Message.get("things"));
    } catch (Exception e) {
      e.printStackTrace();
      fail("Should not get an exception");
    }
  }


  private boolean validateFieldSchema(GenericRecord source, GenericRecord sink, String fieldName, Object override) {
    boolean valid = true;

    // Source
    Schema.Field sourceField = source.getSchema().getField(fieldName);
    Schema sourceFieldSchema = null;
    if (sourceField != null) {
      sourceFieldSchema = sourceField.schema();
    }

    // Sink
    Schema.Field sinkField = sink.getSchema().getField(fieldName);
    Schema sinkFieldSchema = null;
    if (sinkField != null) {
      sinkFieldSchema = sinkField.schema();
    }

    // Compare
    if (sourceFieldSchema == null || sinkFieldSchema == null || !sourceFieldSchema.equals(sinkFieldSchema)) {
      valid &= GenericData.get().validate(sourceFieldSchema, override);
      valid &= GenericData.get().validate(sinkFieldSchema, override);
    }

    return valid;
  }

  /**
   * Overwrites data in the sink record using all appropriate fields from the source record.
   */
  public void overwriteRecord(GenericRecord source, GenericRecord sink) {
    List<Schema.Field> fields = source.getSchema().getFields();
    for (Schema.Field field: fields) {
      String fieldName = field.name();

      overwriteField(source, sink, field);
    }
  }

  /**
   * Overwrites a particular field in the sink record using data from the source record.  The default implementation
   * assumes that each record schema contains the field and that the data from the source record can be copied into
   * the sink record unaltered.
   *
   */
  protected void overwriteField(GenericRecord source, GenericRecord sink, Schema.Field field) {
    String fieldName = field.name();
    Object override = source.get(field.name());

    sink.put(fieldName, override);
  }

  public static <T extends GenericRecord> T jsonToGenericRecord(String s_json, Schema schema) throws Exception{
    InputStream jsonStream = new ByteArrayInputStream(s_json.getBytes("UTF-8"));

    GenericDatumReader<T> reader = new GenericDatumReader<T>(schema);
    Decoder decoder = new JsonDecoder(schema, jsonStream);

    T eventObject = reader.read(null, decoder);
    return eventObject;
  }


  public static String genericRecordToJson(GenericContainer record,Schema recordSchema) throws IOException {
    ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
    DatumWriter<GenericContainer> dw = new GenericDatumWriter<GenericContainer>(recordSchema);
    Encoder encoder = new JsonEncoder(recordSchema, outputStream);
    dw.write(record, encoder);
    encoder.flush();
    return outputStream.toString("UTF-8");
  }

  public static String genericRecordToJson(GenericContainer record) throws IOException {
    return genericRecordToJson(record, record.getSchema());
  }
}