package com.pub.avro;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import junit.framework.TestCase;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.JsonDecoder;
import org.apache.avro.io.JsonEncoder;
import org.junit.Before;
import org.junit.Test;


public class TestEvolvedSchema extends TestCase {


//  {
//    "fields": [
//
//    {
//      "default": null,
//        "name": "things",
//        "type": [
//      "null",
//          {
//              "items": {
//      "fields": [
//      {
//        "name": "thing",
//          "type": "int"
//      },
//      {
//        "name": "score",
//          "type": "float"
//      }
//      ],
//      "name": "ThingRecord",
//          "type": "record"
//    },
//      "type": "array"
//      }
//      ]
//    }
//    ],
//    "name": "ThingPersistenceRecord",
//      "type": "record"
//  }
  private static String V1_PERSISTENCE_SCHEMA = "{\n" + "    \"fields\": [\n" + "        \n" + "        {\n"
    + "            \"default\": null, \n" + "            \"name\": \"things\", \n" + "            \"type\": [\n"
    + "                \"null\", \n" + "                {\n" + "                    \"items\": {\n"
    + "                        \"fields\": [\n" + "                            {\n"
    + "                                \"name\": \"thing\", \n" + "                                \"type\": \"int\"\n"
    + "                            }, \n" + "                            {\n"
    + "                                \"name\": \"score\", \n"
    + "                                \"type\": \"float\"\n" + "                            }\n"
    + "                        ], \n" + "                        \"name\": \"ThingRecord\", \n"
    + "                        \"type\": \"record\"\n" + "                    }, \n"
    + "                    \"type\": \"array\"\n" + "                }\n" + "            ]\n" + "        }\n"
    + "    ], \n" + "    \"name\": \"ThingPersistenceRecord\", \n" + "    \"type\": \"record\"\n" + "}";

//  {
//    "fields": [
//
//    {
//      "default": null,
//        "name": "things",
//        "type": [
//      "null",
//          {
//              "items": {
//      "fields": [
//      {
//        "name": "thing",
//          "type": "int"
//      },
//      {
//        "name": "score",
//          "type": "float"
//      },
//      {
//        "name": "superthing",
//          "type": ["null", "int"],
//        "default": null
//      },
//      {
//        "name": "superthingscore",
//          "type": ["null", "float"],
//        "default": null
//      }
//      ],
//      "name": "ThingRecord",
//          "type": "record"
//    },
//      "type": "array"
//      }
//      ]
//    }
//    ],
//    "name": "ThingPersistenceRecord",
//      "type": "record"
//  }
  private static String V2_PERSISTENCE_SCHEMA = "{\n" + "    \"fields\": [\n" + "        \n" + "        {\n"
    + "            \"default\": null, \n" + "            \"name\": \"things\", \n" + "            \"type\": [\n"
    + "                \"null\", \n" + "                {\n" + "                    \"items\": {\n"
    + "                        \"fields\": [\n" + "                            {\n"
    + "                                \"name\": \"thing\", \n" + "                                \"type\": \"int\"\n"
    + "                            }, \n" + "                            {\n"
    + "                                \"name\": \"score\", \n"
    + "                                \"type\": \"float\"\n" + "                            },\n"
    + "                            {\n" + "                                \"name\": \"superthing\", \n"
    + "                                \"type\": [\"null\", \"int\"],\n"
    + "                                \"default\": null\n" + "                            }, \n"
    + "                            {\n" + "                                \"name\": \"superthingscore\", \n"
    + "                                \"type\": [\"null\", \"float\"],\n"
    + "                                \"default\": null\n" + "                            }\n"
    + "                        ], \n" + "                        \"name\": \"ThingRecord\", \n"
    + "                        \"type\": \"record\"\n" + "                    }, \n"
    + "                    \"type\": \"array\"\n" + "                }\n" + "            ]\n" + "        }\n"
    + "    ], \n" + "    \"name\": \"ThingPersistenceRecord\", \n" + "    \"type\": \"record\"\n" + "}";


//  {
//    "name": "Message",
//      "type": "record",
//      "fields": [
//    {"name": "things", "type": [
//      "null",
//          {"type": "array",
//        "items": {"type": "record",
//        "name": "ThingRecord",
//        "fields": [
//      {"name": "thing", "type": "int"},
//      {"name": "score", "type": "float"}
//      ]
//    }
//      }
//      ],
//      "default": null
//    }
//    ]
//  }
  private static String V1_MESSAGE_SCHEMA = "{\n" + "  \"name\": \"Message\",\n" + "  \"type\": \"record\",\n"
    + "  \"fields\": [\n" + "    {\"name\": \"things\", \"type\": [\n" + "      \"null\",\n"
    + "      {\"type\": \"array\",\n" + "       \"items\": {\"type\": \"record\",\n"
    + "                 \"name\": \"ThingRecord\",\n" + "                 \"fields\": [\n"
    + "                    {\"name\": \"thing\", \"type\": \"int\"},\n"
    + "                    {\"name\": \"score\", \"type\": \"float\"}\n" + "                 ]\n"
    + "                }\n" + "      }\n" + "      ],\n" + "      \"default\": null\n" + "    }\n" + "  ]\n" + "}";



  GenericData.Record v1PersistenceRecord;
  GenericData.Record v2PersistenceRecord;
  GenericRecord v1Message;

  private static final Schema v1MessageSchema = Schema.parse(V1_MESSAGE_SCHEMA);
  private static final Schema v1PersistenceSchema = Schema.parse(V1_PERSISTENCE_SCHEMA);
  private static final Schema v2PersistenceSchema = Schema.parse(V2_PERSISTENCE_SCHEMA);

  @Before
  public void setUp()
      throws Exception {

    v1PersistenceRecord = new GenericData.Record(v1PersistenceSchema);
    v2PersistenceRecord = new GenericData.Record(v2PersistenceSchema);
    v1Message = jsonToGenericRecord("{\"things\":{\"array\":[{\"thing\":8675309,\"score\":0.0999}]}}", v1MessageSchema);
  }

  @Test
  public void testAvroNormalUpdate() throws IOException {
    overwriteRecord(v1Message, v1PersistenceRecord);

    String json = genericRecordToJson(v1PersistenceRecord);
    assertNotNull(json);
  }

  @Test
  public void testAvroEvolvedUpdate() throws IOException {
    overwriteRecord(v1Message, v2PersistenceRecord);

    String json = genericRecordToJson(v2PersistenceRecord);
    assertNotNull(json);
  }


  /**
   * Overwrites data in the sink record using all appropriate fields from the source record.
   */
  public void overwriteRecord(GenericRecord source, GenericRecord sink) {
    List<Schema.Field> fields = source.getSchema().getFields();
    for (Schema.Field field: fields) {
      String fieldName = field.name();

      overwriteField(source, sink, field);
    }
  }

  /**
   * Overwrites a particular field in the sink record using data from the source record.  The default implementation
   * assumes that each record schema contains the field and that the data from the source record can be copied into
   * the sink record unaltered.
   *
   */
  protected void overwriteField(GenericRecord source, GenericRecord sink, Schema.Field field) {
    String fieldName = field.name();
    Object override = source.get(field.name());
    sink.put(fieldName, override);
  }

  public static <T extends GenericRecord> T jsonToGenericRecord(String s_json, Schema schema) throws Exception{
    InputStream jsonStream = new ByteArrayInputStream(s_json.getBytes("UTF-8"));

    GenericDatumReader<T> reader = new GenericDatumReader<T>(schema);
    Decoder decoder = new JsonDecoder(schema, jsonStream);

    T eventObject = reader.read(null, decoder);
    return eventObject;
  }


  public static String genericRecordToJson(GenericRecord record,Schema recordSchema) throws IOException {
      ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
      DatumWriter<GenericRecord> dw = new GenericDatumWriter<GenericRecord>(recordSchema);
      Encoder encoder = new JsonEncoder(recordSchema, outputStream);
      dw.write(record, encoder);
      encoder.flush();
      return outputStream.toString("UTF-8");
  }

  public static String genericRecordToJson(GenericRecord record) throws IOException {
    return genericRecordToJson(record, record.getSchema());
  }
}