http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/api/src/test/java/org/apache/kafka/copycat/data/SchemaProjectorTest.java
----------------------------------------------------------------------
diff --git 
a/copycat/api/src/test/java/org/apache/kafka/copycat/data/SchemaProjectorTest.java
 
b/copycat/api/src/test/java/org/apache/kafka/copycat/data/SchemaProjectorTest.java
deleted file mode 100644
index 31a6f79..0000000
--- 
a/copycat/api/src/test/java/org/apache/kafka/copycat/data/SchemaProjectorTest.java
+++ /dev/null
@@ -1,495 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
- * agreements.  See the NOTICE file distributed with this work for additional 
information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the 
License.  You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software 
distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
- * or implied. See the License for the specific language governing permissions 
and limitations under
- * the License.
- **/
-
-package org.apache.kafka.copycat.data;
-
-import org.apache.kafka.copycat.data.Schema.Type;
-import org.apache.kafka.copycat.errors.DataException;
-import org.apache.kafka.copycat.errors.SchemaProjectorException;
-import org.junit.Test;
-
-import java.math.BigDecimal;
-import java.math.BigInteger;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-public class SchemaProjectorTest {
-
-    @Test
-    public void testPrimitiveTypeProjection() throws Exception {
-        Object projected;
-        projected = SchemaProjector.project(Schema.BOOLEAN_SCHEMA, false, 
Schema.BOOLEAN_SCHEMA);
-        assertEquals(false, projected);
-
-        byte[] bytes = {(byte) 1, (byte) 2};
-        projected  = SchemaProjector.project(Schema.BYTES_SCHEMA, bytes, 
Schema.BYTES_SCHEMA);
-        assertEquals(bytes, projected);
-
-        projected = SchemaProjector.project(Schema.STRING_SCHEMA, "abc", 
Schema.STRING_SCHEMA);
-        assertEquals("abc", projected);
-
-        projected = SchemaProjector.project(Schema.BOOLEAN_SCHEMA, false, 
Schema.OPTIONAL_BOOLEAN_SCHEMA);
-        assertEquals(false, projected);
-
-        projected  = SchemaProjector.project(Schema.BYTES_SCHEMA, bytes, 
Schema.OPTIONAL_BYTES_SCHEMA);
-        assertEquals(bytes, projected);
-
-        projected = SchemaProjector.project(Schema.STRING_SCHEMA, "abc", 
Schema.OPTIONAL_STRING_SCHEMA);
-        assertEquals("abc", projected);
-
-        try {
-            SchemaProjector.project(Schema.OPTIONAL_BOOLEAN_SCHEMA, false, 
Schema.BOOLEAN_SCHEMA);
-            fail("Cannot project optional schema to schema with no default 
value.");
-        } catch (DataException e) {
-            // expected
-        }
-
-        try {
-            SchemaProjector.project(Schema.OPTIONAL_BYTES_SCHEMA, bytes, 
Schema.BYTES_SCHEMA);
-            fail("Cannot project optional schema to schema with no default 
value.");
-        } catch (DataException e) {
-            // expected
-        }
-
-        try {
-            SchemaProjector.project(Schema.OPTIONAL_STRING_SCHEMA, "abc", 
Schema.STRING_SCHEMA);
-            fail("Cannot project optional schema to schema with no default 
value.");
-        } catch (DataException e) {
-            // expected
-        }
-    }
-
-    @Test
-    public void testNumericTypeProjection() throws Exception {
-        Schema[] promotableSchemas = {Schema.INT8_SCHEMA, Schema.INT16_SCHEMA, 
Schema.INT32_SCHEMA, Schema.INT64_SCHEMA, Schema.FLOAT32_SCHEMA, 
Schema.FLOAT64_SCHEMA};
-        Schema[] promotableOptionalSchemas = {Schema.OPTIONAL_INT8_SCHEMA, 
Schema.OPTIONAL_INT16_SCHEMA, Schema.OPTIONAL_INT32_SCHEMA, 
Schema.OPTIONAL_INT64_SCHEMA,
-                                              Schema.OPTIONAL_FLOAT32_SCHEMA, 
Schema.OPTIONAL_FLOAT64_SCHEMA};
-
-        Object[] values = {(byte) 127, (short) 255, 32767, 327890L, 1.2F, 
1.2345};
-        Map<Object, List<?>> expectedProjected = new HashMap<>();
-        expectedProjected.put(values[0], Arrays.asList((byte) 127, (short) 
127, 127, 127L, 127.F, 127.));
-        expectedProjected.put(values[1], Arrays.asList((short) 255, 255, 255L, 
255.F, 255.));
-        expectedProjected.put(values[2], Arrays.asList(32767, 32767L, 32767.F, 
32767.));
-        expectedProjected.put(values[3], Arrays.asList(327890L, 327890.F, 
327890.));
-        expectedProjected.put(values[4], Arrays.asList(1.2F, 1.2));
-        expectedProjected.put(values[5], Arrays.asList(1.2345));
-
-        Object promoted;
-        for (int i = 0; i < promotableSchemas.length; ++i) {
-            Schema source = promotableSchemas[i];
-            List<?> expected = expectedProjected.get(values[i]);
-            for (int j = i; j < promotableSchemas.length; ++j) {
-                Schema target = promotableSchemas[j];
-                promoted = SchemaProjector.project(source, values[i], target);
-                if (target.type() == Type.FLOAT64) {
-                    assertEquals((Double) (expected.get(j - i)), (double) 
promoted, 1e-6);
-                } else {
-                    assertEquals(expected.get(j - i), promoted);
-                }
-            }
-            for (int j = i; j < promotableOptionalSchemas.length;  ++j) {
-                Schema target = promotableOptionalSchemas[j];
-                promoted = SchemaProjector.project(source, values[i], target);
-                if (target.type() == Type.FLOAT64) {
-                    assertEquals((Double) (expected.get(j - i)), (double) 
promoted, 1e-6);
-                } else {
-                    assertEquals(expected.get(j - i), promoted);
-                }
-            }
-        }
-
-        for (int i = 0; i < promotableOptionalSchemas.length; ++i) {
-            Schema source = promotableSchemas[i];
-            List<?> expected = expectedProjected.get(values[i]);
-            for (int j = i; j < promotableOptionalSchemas.length;  ++j) {
-                Schema target = promotableOptionalSchemas[j];
-                promoted = SchemaProjector.project(source, values[i], target);
-                if (target.type() == Type.FLOAT64) {
-                    assertEquals((Double) (expected.get(j - i)), (double) 
promoted, 1e-6);
-                } else {
-                    assertEquals(expected.get(j - i), promoted);
-                }
-            }
-        }
-
-        Schema[] nonPromotableSchemas = {Schema.BOOLEAN_SCHEMA, 
Schema.BYTES_SCHEMA, Schema.STRING_SCHEMA};
-        for (Schema promotableSchema: promotableSchemas) {
-            for (Schema nonPromotableSchema: nonPromotableSchemas) {
-                Object dummy = new Object();
-                try {
-                    SchemaProjector.project(promotableSchema, dummy, 
nonPromotableSchema);
-                    fail("Cannot promote " +  promotableSchema.type() + " to " 
+ nonPromotableSchema.type());
-                } catch (DataException e) {
-                    // expected
-                }
-            }
-        }
-    }
-
-    @Test
-    public void testPrimitiveOptionalProjection() throws Exception {
-        verifyOptionalProjection(Schema.OPTIONAL_BOOLEAN_SCHEMA, Type.BOOLEAN, 
false, true, false, true);
-        verifyOptionalProjection(Schema.OPTIONAL_BOOLEAN_SCHEMA, Type.BOOLEAN, 
false, true, false, false);
-
-        byte[] bytes = {(byte) 1, (byte) 2};
-        byte[] defaultBytes = {(byte) 3, (byte) 4};
-        verifyOptionalProjection(Schema.OPTIONAL_BYTES_SCHEMA, Type.BYTES, 
bytes, defaultBytes, bytes, true);
-        verifyOptionalProjection(Schema.OPTIONAL_BYTES_SCHEMA, Type.BYTES, 
bytes, defaultBytes, bytes, false);
-
-        verifyOptionalProjection(Schema.OPTIONAL_STRING_SCHEMA, Type.STRING, 
"abc", "def", "abc", true);
-        verifyOptionalProjection(Schema.OPTIONAL_STRING_SCHEMA, Type.STRING, 
"abc", "def", "abc", false);
-
-        verifyOptionalProjection(Schema.OPTIONAL_INT8_SCHEMA, Type.INT8, 
(byte) 12, (byte) 127, (byte) 12, true);
-        verifyOptionalProjection(Schema.OPTIONAL_INT8_SCHEMA, Type.INT8, 
(byte) 12, (byte) 127, (byte) 12, false);
-        verifyOptionalProjection(Schema.OPTIONAL_INT8_SCHEMA, Type.INT16, 
(byte) 12, (short) 127, (short) 12, true);
-        verifyOptionalProjection(Schema.OPTIONAL_INT8_SCHEMA, Type.INT16, 
(byte) 12, (short) 127, (short) 12, false);
-        verifyOptionalProjection(Schema.OPTIONAL_INT8_SCHEMA, Type.INT32, 
(byte) 12, 12789, 12, true);
-        verifyOptionalProjection(Schema.OPTIONAL_INT8_SCHEMA, Type.INT32, 
(byte) 12, 12789, 12, false);
-        verifyOptionalProjection(Schema.OPTIONAL_INT8_SCHEMA, Type.INT64, 
(byte) 12, 127890L, 12L, true);
-        verifyOptionalProjection(Schema.OPTIONAL_INT8_SCHEMA, Type.INT64, 
(byte) 12, 127890L, 12L, false);
-        verifyOptionalProjection(Schema.OPTIONAL_INT8_SCHEMA, Type.FLOAT32, 
(byte) 12, 3.45F, 12.F, true);
-        verifyOptionalProjection(Schema.OPTIONAL_INT8_SCHEMA, Type.FLOAT32, 
(byte) 12, 3.45F, 12.F, false);
-        verifyOptionalProjection(Schema.OPTIONAL_INT8_SCHEMA, Type.FLOAT64, 
(byte) 12, 3.4567, 12., true);
-        verifyOptionalProjection(Schema.OPTIONAL_INT8_SCHEMA, Type.FLOAT64, 
(byte) 12, 3.4567, 12., false);
-
-        verifyOptionalProjection(Schema.OPTIONAL_INT16_SCHEMA, Type.INT16, 
(short) 12, (short) 127, (short) 12, true);
-        verifyOptionalProjection(Schema.OPTIONAL_INT16_SCHEMA, Type.INT16, 
(short) 12, (short) 127, (short) 12, false);
-        verifyOptionalProjection(Schema.OPTIONAL_INT16_SCHEMA, Type.INT32, 
(short) 12, 12789, 12, true);
-        verifyOptionalProjection(Schema.OPTIONAL_INT16_SCHEMA, Type.INT32, 
(short) 12, 12789, 12, false);
-        verifyOptionalProjection(Schema.OPTIONAL_INT16_SCHEMA, Type.INT64, 
(short) 12, 127890L, 12L, true);
-        verifyOptionalProjection(Schema.OPTIONAL_INT16_SCHEMA, Type.INT64, 
(short) 12, 127890L, 12L, false);
-        verifyOptionalProjection(Schema.OPTIONAL_INT16_SCHEMA, Type.FLOAT32, 
(short) 12, 3.45F, 12.F, true);
-        verifyOptionalProjection(Schema.OPTIONAL_INT16_SCHEMA, Type.FLOAT32, 
(short) 12, 3.45F, 12.F, false);
-        verifyOptionalProjection(Schema.OPTIONAL_INT16_SCHEMA, Type.FLOAT64, 
(short) 12, 3.4567, 12., true);
-        verifyOptionalProjection(Schema.OPTIONAL_INT16_SCHEMA, Type.FLOAT64, 
(short) 12, 3.4567, 12., false);
-
-        verifyOptionalProjection(Schema.OPTIONAL_INT32_SCHEMA, Type.INT32, 12, 
12789, 12, true);
-        verifyOptionalProjection(Schema.OPTIONAL_INT32_SCHEMA, Type.INT32, 12, 
12789, 12, false);
-        verifyOptionalProjection(Schema.OPTIONAL_INT32_SCHEMA, Type.INT64, 12, 
127890L, 12L, true);
-        verifyOptionalProjection(Schema.OPTIONAL_INT32_SCHEMA, Type.INT64, 12, 
127890L, 12L, false);
-        verifyOptionalProjection(Schema.OPTIONAL_INT32_SCHEMA, Type.FLOAT32, 
12, 3.45F, 12.F, true);
-        verifyOptionalProjection(Schema.OPTIONAL_INT32_SCHEMA, Type.FLOAT32, 
12, 3.45F, 12.F, false);
-        verifyOptionalProjection(Schema.OPTIONAL_INT32_SCHEMA, Type.FLOAT64, 
12, 3.4567, 12., true);
-        verifyOptionalProjection(Schema.OPTIONAL_INT32_SCHEMA, Type.FLOAT64, 
12, 3.4567, 12., false);
-
-        verifyOptionalProjection(Schema.OPTIONAL_INT64_SCHEMA, Type.INT64, 
12L, 127890L, 12L, true);
-        verifyOptionalProjection(Schema.OPTIONAL_INT64_SCHEMA, Type.INT64, 
12L, 127890L, 12L, false);
-        verifyOptionalProjection(Schema.OPTIONAL_INT64_SCHEMA, Type.FLOAT32, 
12L, 3.45F, 12.F, true);
-        verifyOptionalProjection(Schema.OPTIONAL_INT64_SCHEMA, Type.FLOAT32, 
12L, 3.45F, 12.F, false);
-        verifyOptionalProjection(Schema.OPTIONAL_INT64_SCHEMA, Type.FLOAT64, 
12L, 3.4567, 12., true);
-        verifyOptionalProjection(Schema.OPTIONAL_INT64_SCHEMA, Type.FLOAT64, 
12L, 3.4567, 12., false);
-
-        verifyOptionalProjection(Schema.OPTIONAL_FLOAT32_SCHEMA, Type.FLOAT32, 
12.345F, 3.45F, 12.345F, true);
-        verifyOptionalProjection(Schema.OPTIONAL_FLOAT32_SCHEMA, Type.FLOAT32, 
12.345F, 3.45F, 12.345F, false);
-        verifyOptionalProjection(Schema.OPTIONAL_FLOAT32_SCHEMA, Type.FLOAT64, 
12.345F, 3.4567, 12.345, true);
-        verifyOptionalProjection(Schema.OPTIONAL_FLOAT32_SCHEMA, Type.FLOAT64, 
12.345F, 3.4567, 12.345, false);
-
-        verifyOptionalProjection(Schema.OPTIONAL_FLOAT32_SCHEMA, Type.FLOAT64, 
12.345, 3.4567, 12.345, true);
-        verifyOptionalProjection(Schema.OPTIONAL_FLOAT32_SCHEMA, Type.FLOAT64, 
12.345, 3.4567, 12.345, false);
-    }
-
-    @Test
-    public void testStructAddField() throws Exception {
-        Schema source = SchemaBuilder.struct()
-                .field("field", Schema.INT32_SCHEMA)
-                .build();
-        Struct sourceStruct = new Struct(source);
-        sourceStruct.put("field", 1);
-
-        Schema target = SchemaBuilder.struct()
-                .field("field", Schema.INT32_SCHEMA)
-                .field("field2", 
SchemaBuilder.int32().defaultValue(123).build())
-                .build();
-
-        Struct targetStruct = (Struct) SchemaProjector.project(source, 
sourceStruct, target);
-
-
-        assertEquals(1, (int) targetStruct.getInt32("field"));
-        assertEquals(123, (int) targetStruct.getInt32("field2"));
-
-        Schema incompatibleTargetSchema = SchemaBuilder.struct()
-                .field("field", Schema.INT32_SCHEMA)
-                .field("field2", Schema.INT32_SCHEMA)
-                .build();
-
-        try {
-            SchemaProjector.project(source, sourceStruct, 
incompatibleTargetSchema);
-            fail("Incompatible schema.");
-        } catch (DataException e) {
-            // expected
-        }
-    }
-
-    @Test
-    public void testStructRemoveField() throws Exception {
-        Schema source = SchemaBuilder.struct()
-                .field("field", Schema.INT32_SCHEMA)
-                .field("field2", Schema.INT32_SCHEMA)
-                .build();
-        Struct sourceStruct = new Struct(source);
-        sourceStruct.put("field", 1);
-        sourceStruct.put("field2", 234);
-
-        Schema target = SchemaBuilder.struct()
-                .field("field", Schema.INT32_SCHEMA)
-                .build();
-        Struct targetStruct = (Struct) SchemaProjector.project(source, 
sourceStruct, target);
-
-        assertEquals(1, targetStruct.get("field"));
-        try {
-            targetStruct.get("field2");
-            fail("field2 is not part of the projected struct");
-        } catch (DataException e) {
-            // expected
-        }
-    }
-
-    @Test
-    public void testStructDefaultValue() throws Exception {
-        Schema source = SchemaBuilder.struct().optional()
-                .field("field", Schema.INT32_SCHEMA)
-                .field("field2", Schema.INT32_SCHEMA)
-                .build();
-
-        SchemaBuilder builder = SchemaBuilder.struct()
-                .field("field", Schema.INT32_SCHEMA)
-                .field("field2", Schema.INT32_SCHEMA);
-
-        Struct defaultStruct = new Struct(builder).put("field", 
12).put("field2", 345);
-        builder.defaultValue(defaultStruct);
-        Schema target = builder.build();
-
-        Object projected = SchemaProjector.project(source, null, target);
-        assertEquals(defaultStruct, projected);
-
-        Struct sourceStruct = new Struct(source).put("field", 
45).put("field2", 678);
-        Struct targetStruct = (Struct) SchemaProjector.project(source, 
sourceStruct, target);
-
-        assertEquals(sourceStruct.get("field"), targetStruct.get("field"));
-        assertEquals(sourceStruct.get("field2"), targetStruct.get("field2"));
-    }
-
-    @Test
-    public void testNestedSchemaProjection() throws Exception {
-        Schema sourceFlatSchema = SchemaBuilder.struct()
-                .field("field", Schema.INT32_SCHEMA)
-                .build();
-        Schema targetFlatSchema = SchemaBuilder.struct()
-                .field("field", Schema.INT32_SCHEMA)
-                .field("field2", 
SchemaBuilder.int32().defaultValue(123).build())
-                .build();
-        Schema sourceNestedSchema = SchemaBuilder.struct()
-                .field("first", Schema.INT32_SCHEMA)
-                .field("second", Schema.STRING_SCHEMA)
-                .field("array", 
SchemaBuilder.array(Schema.INT32_SCHEMA).build())
-                .field("map", SchemaBuilder.map(Schema.INT32_SCHEMA, 
Schema.STRING_SCHEMA).build())
-                .field("nested", sourceFlatSchema)
-                .build();
-        Schema targetNestedSchema = SchemaBuilder.struct()
-                .field("first", Schema.INT32_SCHEMA)
-                .field("second", Schema.STRING_SCHEMA)
-                .field("array", 
SchemaBuilder.array(Schema.INT32_SCHEMA).build())
-                .field("map", SchemaBuilder.map(Schema.INT32_SCHEMA, 
Schema.STRING_SCHEMA).build())
-                .field("nested", targetFlatSchema)
-                .build();
-
-        Struct sourceFlatStruct = new Struct(sourceFlatSchema);
-        sourceFlatStruct.put("field", 113);
-
-        Struct sourceNestedStruct = new Struct(sourceNestedSchema);
-        sourceNestedStruct.put("first", 1);
-        sourceNestedStruct.put("second", "abc");
-        sourceNestedStruct.put("array", Arrays.asList(1, 2));
-        sourceNestedStruct.put("map", Collections.singletonMap(5, "def"));
-        sourceNestedStruct.put("nested", sourceFlatStruct);
-
-        Struct targetNestedStruct = (Struct) 
SchemaProjector.project(sourceNestedSchema, sourceNestedStruct,
-                                                                     
targetNestedSchema);
-        assertEquals(1, targetNestedStruct.get("first"));
-        assertEquals("abc", targetNestedStruct.get("second"));
-        assertEquals(Arrays.asList(1, 2), (List<Integer>) 
targetNestedStruct.get("array"));
-        assertEquals(Collections.singletonMap(5, "def"), (Map<Integer, 
String>) targetNestedStruct.get("map"));
-
-        Struct projectedStruct = (Struct) targetNestedStruct.get("nested");
-        assertEquals(113, projectedStruct.get("field"));
-        assertEquals(123, projectedStruct.get("field2"));
-    }
-
-    @Test
-    public void testLogicalTypeProjection() throws Exception {
-        Schema[] logicalTypeSchemas = {Decimal.schema(2), Date.SCHEMA, 
Time.SCHEMA, Timestamp.SCHEMA};
-        Object projected;
-
-        BigDecimal testDecimal = new BigDecimal(new BigInteger("156"), 2);
-        projected = SchemaProjector.project(Decimal.schema(2), testDecimal, 
Decimal.schema(2));
-        assertEquals(testDecimal, projected);
-
-        projected = SchemaProjector.project(Date.SCHEMA, 1000, Date.SCHEMA);
-        assertEquals(1000, projected);
-
-        projected = SchemaProjector.project(Time.SCHEMA, 231, Time.SCHEMA);
-        assertEquals(231, projected);
-
-        projected = SchemaProjector.project(Timestamp.SCHEMA, 34567L, 
Timestamp.SCHEMA);
-        assertEquals(34567L, projected);
-
-        Schema namedSchema = 
SchemaBuilder.int32().name("invalidLogicalTypeName").build();
-        for (Schema logicalTypeSchema: logicalTypeSchemas) {
-            try {
-                SchemaProjector.project(logicalTypeSchema, null, 
Schema.BOOLEAN_SCHEMA);
-                fail("Cannot project logical types to non-logical types.");
-            } catch (SchemaProjectorException e) {
-                // expected
-            }
-
-            try {
-                SchemaProjector.project(logicalTypeSchema, null, namedSchema);
-                fail("Reader name is not a valid logical type name.");
-            } catch (SchemaProjectorException e) {
-                // expected
-            }
-
-            try {
-                SchemaProjector.project(Schema.BOOLEAN_SCHEMA, null, 
logicalTypeSchema);
-                fail("Cannot project non-logical types to logical types.");
-            } catch (SchemaProjectorException e) {
-                // expected
-            }
-        }
-    }
-
-    @Test
-    public void testArrayProjection() throws Exception {
-        Schema source = SchemaBuilder.array(Schema.INT32_SCHEMA).build();
-
-        Object projected = SchemaProjector.project(source, Arrays.asList(1, 2, 
3), source);
-        assertEquals(Arrays.asList(1, 2, 3), (List<Integer>) projected);
-
-        Schema optionalSource = 
SchemaBuilder.array(Schema.INT32_SCHEMA).optional().build();
-        Schema target = 
SchemaBuilder.array(Schema.INT32_SCHEMA).defaultValue(Arrays.asList(1, 2, 
3)).build();
-        projected = SchemaProjector.project(optionalSource, Arrays.asList(4, 
5), target);
-        assertEquals(Arrays.asList(4, 5), (List<Integer>) projected);
-        projected = SchemaProjector.project(optionalSource, null, target);
-        assertEquals(Arrays.asList(1, 2, 3), (List<Integer>) projected);
-
-        Schema promotedTarget = 
SchemaBuilder.array(Schema.INT64_SCHEMA).defaultValue(Arrays.asList(1L, 2L, 
3L)).build();
-        projected = SchemaProjector.project(optionalSource, Arrays.asList(4, 
5), promotedTarget);
-        List<Long> expectedProjected = Arrays.asList(4L, 5L);
-        assertEquals(expectedProjected, (List<Long>) projected);
-        projected = SchemaProjector.project(optionalSource, null, 
promotedTarget);
-        assertEquals(Arrays.asList(1L, 2L, 3L), (List<Long>) projected);
-
-        Schema noDefaultValueTarget = 
SchemaBuilder.array(Schema.INT32_SCHEMA).build();
-        try {
-            SchemaProjector.project(optionalSource, null, 
noDefaultValueTarget);
-            fail("Target schema does not provide a default value.");
-        } catch (SchemaProjectorException e) {
-            // expected
-        }
-
-        Schema nonPromotableTarget = 
SchemaBuilder.array(Schema.BOOLEAN_SCHEMA).build();
-        try {
-            SchemaProjector.project(optionalSource, null, nonPromotableTarget);
-            fail("Neither source type matches target type nor source type can 
be promoted to target type");
-        } catch (SchemaProjectorException e) {
-            // expected
-        }
-    }
-
-    @Test
-    public void testMapProjection() throws Exception {
-        Schema source = SchemaBuilder.map(Schema.INT32_SCHEMA, 
Schema.INT32_SCHEMA).optional().build();
-
-        Schema target = SchemaBuilder.map(Schema.INT32_SCHEMA, 
Schema.INT32_SCHEMA).defaultValue(Collections.singletonMap(1, 2)).build();
-        Object projected = SchemaProjector.project(source, 
Collections.singletonMap(3, 4), target);
-        assertEquals(Collections.singletonMap(3, 4), (Map<Integer, Integer>) 
projected);
-        projected = SchemaProjector.project(source, null, target);
-        assertEquals(Collections.singletonMap(1, 2), (Map<Integer, Integer>) 
projected);
-
-        Schema promotedTarget = SchemaBuilder.map(Schema.INT64_SCHEMA, 
Schema.FLOAT32_SCHEMA).defaultValue(
-                Collections.singletonMap(3L, 4.5F)).build();
-        projected = SchemaProjector.project(source, 
Collections.singletonMap(3, 4), promotedTarget);
-        assertEquals(Collections.singletonMap(3L, 4.F), (Map<Long, Float>) 
projected);
-        projected = SchemaProjector.project(source, null, promotedTarget);
-        assertEquals(Collections.singletonMap(3L, 4.5F), (Map<Long, Float>) 
projected);
-
-        Schema noDefaultValueTarget = SchemaBuilder.map(Schema.INT32_SCHEMA, 
Schema.INT32_SCHEMA).build();
-        try {
-            SchemaProjector.project(source, null, noDefaultValueTarget);
-            fail("Reader does not provide a default value.");
-        } catch (SchemaProjectorException e) {
-            // expected
-        }
-
-        Schema nonPromotableTarget = SchemaBuilder.map(Schema.BOOLEAN_SCHEMA, 
Schema.STRING_SCHEMA).build();
-        try {
-            SchemaProjector.project(source, null, nonPromotableTarget);
-            fail("Neither source type matches target type nor source type can 
be promoted to target type");
-        } catch (SchemaProjectorException e) {
-            // expected
-        }
-    }
-
-    @Test
-    public void testMaybeCompatible() throws Exception {
-        Schema source = SchemaBuilder.int32().name("source").build();
-        Schema target = SchemaBuilder.int32().name("target").build();
-
-        try {
-            SchemaProjector.project(source, 12, target);
-            fail("Source name and target name mismatch.");
-        } catch (SchemaProjectorException e) {
-            // expected
-        }
-
-        Schema targetWithParameters = 
SchemaBuilder.int32().parameters(Collections.singletonMap("key", "value"));
-        try {
-            SchemaProjector.project(source, 34, targetWithParameters);
-            fail("Source parameters and target parameters mismatch.");
-        } catch (SchemaProjectorException e) {
-            // expected
-        }
-    }
-
-    private void verifyOptionalProjection(Schema source, Type targetType, 
Object value, Object defaultValue, Object expectedProjected, boolean optional) {
-        Schema target;
-        assert source.isOptional();
-        assert value != null;
-        if (optional) {
-            target = 
SchemaBuilder.type(targetType).optional().defaultValue(defaultValue).build();
-        } else {
-            target = 
SchemaBuilder.type(targetType).defaultValue(defaultValue).build();
-        }
-        Object projected = SchemaProjector.project(source, value, target);
-        if (targetType == Type.FLOAT64) {
-            assertEquals((double) expectedProjected, (double) projected, 1e-6);
-        } else {
-            assertEquals(expectedProjected, projected);
-        }
-
-        projected = SchemaProjector.project(source, null, target);
-        if (optional) {
-            assertEquals(null, projected);
-        } else {
-            assertEquals(defaultValue, projected);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/api/src/test/java/org/apache/kafka/copycat/data/StructTest.java
----------------------------------------------------------------------
diff --git 
a/copycat/api/src/test/java/org/apache/kafka/copycat/data/StructTest.java 
b/copycat/api/src/test/java/org/apache/kafka/copycat/data/StructTest.java
deleted file mode 100644
index 162396b..0000000
--- a/copycat/api/src/test/java/org/apache/kafka/copycat/data/StructTest.java
+++ /dev/null
@@ -1,222 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.data;
-
-import org.apache.kafka.copycat.errors.DataException;
-import org.junit.Test;
-
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-
-public class StructTest {
-
-    private static final Schema FLAT_STRUCT_SCHEMA = SchemaBuilder.struct()
-            .field("int8", Schema.INT8_SCHEMA)
-            .field("int16", Schema.INT16_SCHEMA)
-            .field("int32", Schema.INT32_SCHEMA)
-            .field("int64", Schema.INT64_SCHEMA)
-            .field("float32", Schema.FLOAT32_SCHEMA)
-            .field("float64", Schema.FLOAT64_SCHEMA)
-            .field("boolean", Schema.BOOLEAN_SCHEMA)
-            .field("string", Schema.STRING_SCHEMA)
-            .field("bytes", Schema.BYTES_SCHEMA)
-            .build();
-
-    private static final Schema ARRAY_SCHEMA = 
SchemaBuilder.array(Schema.INT8_SCHEMA).build();
-    private static final Schema MAP_SCHEMA = SchemaBuilder.map(
-            Schema.INT32_SCHEMA,
-            Schema.STRING_SCHEMA
-    ).build();
-    private static final Schema NESTED_CHILD_SCHEMA = SchemaBuilder.struct()
-            .field("int8", Schema.INT8_SCHEMA)
-            .build();
-    private static final Schema NESTED_SCHEMA = SchemaBuilder.struct()
-            .field("array", ARRAY_SCHEMA)
-            .field("map", MAP_SCHEMA)
-            .field("nested", NESTED_CHILD_SCHEMA)
-            .build();
-
-    private static final Schema REQUIRED_FIELD_SCHEMA = Schema.INT8_SCHEMA;
-    private static final Schema OPTIONAL_FIELD_SCHEMA = 
SchemaBuilder.int8().optional().build();
-    private static final Schema DEFAULT_FIELD_SCHEMA = 
SchemaBuilder.int8().defaultValue((byte) 0).build();
-
-    @Test
-    public void testFlatStruct() {
-        Struct struct = new Struct(FLAT_STRUCT_SCHEMA)
-                .put("int8", (byte) 12)
-                .put("int16", (short) 12)
-                .put("int32", 12)
-                .put("int64", (long) 12)
-                .put("float32", 12.f)
-                .put("float64", 12.)
-                .put("boolean", true)
-                .put("string", "foobar")
-                .put("bytes", "foobar".getBytes());
-
-        // Test equality, and also the type-specific getters
-        assertEquals((byte) 12, (byte) struct.getInt8("int8"));
-        assertEquals((short) 12, (short) struct.getInt16("int16"));
-        assertEquals(12, (int) struct.getInt32("int32"));
-        assertEquals((long) 12, (long) struct.getInt64("int64"));
-        assertEquals((Float) 12.f, struct.getFloat32("float32"));
-        assertEquals((Double) 12., struct.getFloat64("float64"));
-        assertEquals(true, struct.getBoolean("boolean"));
-        assertEquals("foobar", struct.getString("string"));
-        assertEquals(ByteBuffer.wrap("foobar".getBytes()), 
ByteBuffer.wrap(struct.getBytes("bytes")));
-
-        struct.validate();
-    }
-
-    @Test
-    public void testComplexStruct() {
-        List<Byte> array = Arrays.asList((byte) 1, (byte) 2);
-        Map<Integer, String> map = Collections.singletonMap(1, "string");
-        Struct struct = new Struct(NESTED_SCHEMA)
-                .put("array", array)
-                .put("map", map)
-                .put("nested", new Struct(NESTED_CHILD_SCHEMA).put("int8", 
(byte) 12));
-
-        // Separate the call to get the array and map to validate the typed 
get methods work properly
-        List<Byte> arrayExtracted = struct.getArray("array");
-        assertEquals(array, arrayExtracted);
-        Map<Byte, Byte> mapExtracted = struct.getMap("map");
-        assertEquals(map, mapExtracted);
-        assertEquals((byte) 12, struct.getStruct("nested").get("int8"));
-
-        struct.validate();
-    }
-
-
-    // These don't test all the ways validation can fail, just one for each 
element. See more extensive validation
-    // tests in SchemaTest. These are meant to ensure that we are invoking the 
same code path and that we do deeper
-    // inspection than just checking the class of the object
-
-    @Test(expected = DataException.class)
-    public void testInvalidFieldType() {
-        new Struct(FLAT_STRUCT_SCHEMA).put("int8", "should fail because this 
is a string, not int8");
-    }
-
-    @Test(expected = DataException.class)
-    public void testInvalidArrayFieldElements() {
-        new Struct(NESTED_SCHEMA).put("array", Arrays.asList("should fail 
since elements should be int8s"));
-    }
-
-    @Test(expected = DataException.class)
-    public void testInvalidMapKeyElements() {
-        new Struct(NESTED_SCHEMA).put("map", Collections.singletonMap("should 
fail because keys should be int8s", (byte) 12));
-    }
-
-    @Test(expected = DataException.class)
-    public void testInvalidStructFieldSchema() {
-        new Struct(NESTED_SCHEMA).put("nested", new Struct(MAP_SCHEMA));
-    }
-
-    @Test(expected = DataException.class)
-    public void testInvalidStructFieldValue() {
-        new Struct(NESTED_SCHEMA).put("nested", new 
Struct(NESTED_CHILD_SCHEMA));
-    }
-
-
-    @Test(expected = DataException.class)
-    public void testMissingFieldValidation() {
-        // Required int8 field
-        Schema schema = SchemaBuilder.struct().field("field", 
REQUIRED_FIELD_SCHEMA).build();
-        Struct struct = new Struct(schema);
-        struct.validate();
-    }
-
-    @Test
-    public void testMissingOptionalFieldValidation() {
-        Schema schema = SchemaBuilder.struct().field("field", 
OPTIONAL_FIELD_SCHEMA).build();
-        Struct struct = new Struct(schema);
-        struct.validate();
-    }
-
-    @Test
-    public void testMissingFieldWithDefaultValidation() {
-        Schema schema = SchemaBuilder.struct().field("field", 
DEFAULT_FIELD_SCHEMA).build();
-        Struct struct = new Struct(schema);
-        struct.validate();
-    }
-
-
-    @Test
-    public void testEquals() {
-        Struct struct1 = new Struct(FLAT_STRUCT_SCHEMA)
-                .put("int8", (byte) 12)
-                .put("int16", (short) 12)
-                .put("int32", 12)
-                .put("int64", (long) 12)
-                .put("float32", 12.f)
-                .put("float64", 12.)
-                .put("boolean", true)
-                .put("string", "foobar")
-                .put("bytes", ByteBuffer.wrap("foobar".getBytes()));
-        Struct struct2 = new Struct(FLAT_STRUCT_SCHEMA)
-                .put("int8", (byte) 12)
-                .put("int16", (short) 12)
-                .put("int32", 12)
-                .put("int64", (long) 12)
-                .put("float32", 12.f)
-                .put("float64", 12.)
-                .put("boolean", true)
-                .put("string", "foobar")
-                .put("bytes", ByteBuffer.wrap("foobar".getBytes()));
-        Struct struct3 = new Struct(FLAT_STRUCT_SCHEMA)
-                .put("int8", (byte) 12)
-                .put("int16", (short) 12)
-                .put("int32", 12)
-                .put("int64", (long) 12)
-                .put("float32", 12.f)
-                .put("float64", 12.)
-                .put("boolean", true)
-                .put("string", "mismatching string")
-                .put("bytes", ByteBuffer.wrap("foobar".getBytes()));
-
-        assertEquals(struct1, struct2);
-        assertNotEquals(struct1, struct3);
-
-        List<Byte> array = Arrays.asList((byte) 1, (byte) 2);
-        Map<Integer, String> map = Collections.singletonMap(1, "string");
-        struct1 = new Struct(NESTED_SCHEMA)
-                .put("array", array)
-                .put("map", map)
-                .put("nested", new Struct(NESTED_CHILD_SCHEMA).put("int8", 
(byte) 12));
-        List<Byte> array2 = Arrays.asList((byte) 1, (byte) 2);
-        Map<Integer, String> map2 = Collections.singletonMap(1, "string");
-        struct2 = new Struct(NESTED_SCHEMA)
-                .put("array", array2)
-                .put("map", map2)
-                .put("nested", new Struct(NESTED_CHILD_SCHEMA).put("int8", 
(byte) 12));
-        List<Byte> array3 = Arrays.asList((byte) 1, (byte) 2, (byte) 3);
-        Map<Integer, String> map3 = Collections.singletonMap(2, "string");
-        struct3 = new Struct(NESTED_SCHEMA)
-                .put("array", array3)
-                .put("map", map3)
-                .put("nested", new Struct(NESTED_CHILD_SCHEMA).put("int8", 
(byte) 13));
-
-        assertEquals(struct1, struct2);
-        assertNotEquals(struct1, struct3);
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/api/src/test/java/org/apache/kafka/copycat/data/TimeTest.java
----------------------------------------------------------------------
diff --git 
a/copycat/api/src/test/java/org/apache/kafka/copycat/data/TimeTest.java 
b/copycat/api/src/test/java/org/apache/kafka/copycat/data/TimeTest.java
deleted file mode 100644
index 8e54cb2..0000000
--- a/copycat/api/src/test/java/org/apache/kafka/copycat/data/TimeTest.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.data;
-
-import org.apache.kafka.copycat.errors.DataException;
-import org.junit.Test;
-
-import java.util.Calendar;
-import java.util.GregorianCalendar;
-import java.util.TimeZone;
-
-import static org.junit.Assert.assertEquals;
-
-public class TimeTest {
-    private static final GregorianCalendar EPOCH;
-    private static final GregorianCalendar EPOCH_PLUS_DATE_COMPONENT;
-    private static final GregorianCalendar EPOCH_PLUS_TEN_THOUSAND_MILLIS;
-    static {
-        EPOCH = new GregorianCalendar(1970, Calendar.JANUARY, 1, 0, 0, 0);
-        EPOCH.setTimeZone(TimeZone.getTimeZone("UTC"));
-
-        EPOCH_PLUS_TEN_THOUSAND_MILLIS = new GregorianCalendar(1970, 
Calendar.JANUARY, 1, 0, 0, 0);
-        
EPOCH_PLUS_TEN_THOUSAND_MILLIS.setTimeZone(TimeZone.getTimeZone("UTC"));
-        EPOCH_PLUS_TEN_THOUSAND_MILLIS.add(Calendar.MILLISECOND, 10000);
-
-
-        EPOCH_PLUS_DATE_COMPONENT = new GregorianCalendar(1970, 
Calendar.JANUARY, 1, 0, 0, 0);
-        EPOCH_PLUS_DATE_COMPONENT.setTimeZone(TimeZone.getTimeZone("UTC"));
-        EPOCH_PLUS_DATE_COMPONENT.add(Calendar.DATE, 10000);
-    }
-
-    @Test
-    public void testBuilder() {
-        Schema plain = Time.SCHEMA;
-        assertEquals(Time.LOGICAL_NAME, plain.name());
-        assertEquals(1, (Object) plain.version());
-    }
-
-    @Test
-    public void testFromLogical() {
-        assertEquals(0, Time.fromLogical(Time.SCHEMA, EPOCH.getTime()));
-        assertEquals(10000, Time.fromLogical(Time.SCHEMA, 
EPOCH_PLUS_TEN_THOUSAND_MILLIS.getTime()));
-    }
-
-    @Test(expected = DataException.class)
-    public void testFromLogicalInvalidSchema() {
-        Time.fromLogical(Time.builder().name("invalid").build(), 
EPOCH.getTime());
-    }
-
-    @Test(expected = DataException.class)
-    public void testFromLogicalInvalidHasDateComponents() {
-        Time.fromLogical(Time.SCHEMA, EPOCH_PLUS_DATE_COMPONENT.getTime());
-    }
-
-    @Test
-    public void testToLogical() {
-        assertEquals(EPOCH.getTime(), Time.toLogical(Time.SCHEMA, 0));
-        assertEquals(EPOCH_PLUS_TEN_THOUSAND_MILLIS.getTime(), 
Time.toLogical(Time.SCHEMA, 10000));
-    }
-
-    @Test(expected = DataException.class)
-    public void testToLogicalInvalidSchema() {
-        Time.toLogical(Time.builder().name("invalid").build(), 0);
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/api/src/test/java/org/apache/kafka/copycat/data/TimestampTest.java
----------------------------------------------------------------------
diff --git 
a/copycat/api/src/test/java/org/apache/kafka/copycat/data/TimestampTest.java 
b/copycat/api/src/test/java/org/apache/kafka/copycat/data/TimestampTest.java
deleted file mode 100644
index cb5454c..0000000
--- a/copycat/api/src/test/java/org/apache/kafka/copycat/data/TimestampTest.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.data;
-
-import org.apache.kafka.copycat.errors.DataException;
-import org.junit.Test;
-
-import java.util.Calendar;
-import java.util.GregorianCalendar;
-import java.util.TimeZone;
-
-import static org.junit.Assert.assertEquals;
-
-public class TimestampTest {
-    private static final GregorianCalendar EPOCH;
-    private static final GregorianCalendar EPOCH_PLUS_MILLIS;
-
-    private static final int NUM_MILLIS = 2000000000;
-    private static final long TOTAL_MILLIS = ((long) NUM_MILLIS) * 2;
-
-    static {
-        EPOCH = new GregorianCalendar(1970, Calendar.JANUARY, 1, 0, 0, 0);
-        EPOCH.setTimeZone(TimeZone.getTimeZone("UTC"));
-
-
-        EPOCH_PLUS_MILLIS = new GregorianCalendar(1970, Calendar.JANUARY, 1, 
0, 0, 0);
-        EPOCH_PLUS_MILLIS.setTimeZone(TimeZone.getTimeZone("UTC"));
-        EPOCH_PLUS_MILLIS.add(Calendar.MILLISECOND, NUM_MILLIS);
-        EPOCH_PLUS_MILLIS.add(Calendar.MILLISECOND, NUM_MILLIS);
-    }
-
-    @Test
-    public void testBuilder() {
-        Schema plain = Date.SCHEMA;
-        assertEquals(Date.LOGICAL_NAME, plain.name());
-        assertEquals(1, (Object) plain.version());
-    }
-
-    @Test
-    public void testFromLogical() {
-        assertEquals(0L, Timestamp.fromLogical(Timestamp.SCHEMA, 
EPOCH.getTime()));
-        assertEquals(TOTAL_MILLIS, Timestamp.fromLogical(Timestamp.SCHEMA, 
EPOCH_PLUS_MILLIS.getTime()));
-    }
-
-    @Test(expected = DataException.class)
-    public void testFromLogicalInvalidSchema() {
-        Timestamp.fromLogical(Timestamp.builder().name("invalid").build(), 
EPOCH.getTime());
-    }
-
-    @Test
-    public void testToLogical() {
-        assertEquals(EPOCH.getTime(), Timestamp.toLogical(Timestamp.SCHEMA, 
0L));
-        assertEquals(EPOCH_PLUS_MILLIS.getTime(), 
Timestamp.toLogical(Timestamp.SCHEMA, TOTAL_MILLIS));
-    }
-
-    @Test(expected = DataException.class)
-    public void testToLogicalInvalidSchema() {
-        Date.toLogical(Date.builder().name("invalid").build(), 0);
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/api/src/test/java/org/apache/kafka/copycat/storage/StringConverterTest.java
----------------------------------------------------------------------
diff --git 
a/copycat/api/src/test/java/org/apache/kafka/copycat/storage/StringConverterTest.java
 
b/copycat/api/src/test/java/org/apache/kafka/copycat/storage/StringConverterTest.java
deleted file mode 100644
index 3ea69c1..0000000
--- 
a/copycat/api/src/test/java/org/apache/kafka/copycat/storage/StringConverterTest.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.storage;
-
-import org.apache.kafka.copycat.data.Schema;
-import org.apache.kafka.copycat.data.SchemaAndValue;
-import org.junit.Test;
-
-import java.io.UnsupportedEncodingException;
-import java.util.Collections;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertArrayEquals;
-
-public class StringConverterTest {
-    private static final String TOPIC = "topic";
-    private static final String SAMPLE_STRING = "a string";
-
-    private StringConverter converter = new StringConverter();
-
-    @Test
-    public void testStringToBytes() throws UnsupportedEncodingException {
-        assertArrayEquals(SAMPLE_STRING.getBytes("UTF8"), 
converter.fromCopycatData(TOPIC, Schema.STRING_SCHEMA, SAMPLE_STRING));
-    }
-
-    @Test
-    public void testNonStringToBytes() throws UnsupportedEncodingException {
-        assertArrayEquals("true".getBytes("UTF8"), 
converter.fromCopycatData(TOPIC, Schema.BOOLEAN_SCHEMA, true));
-    }
-
-    @Test
-    public void testNullToBytes() {
-        assertEquals(null, converter.fromCopycatData(TOPIC, 
Schema.OPTIONAL_STRING_SCHEMA, null));
-    }
-
-    @Test
-    public void testToBytesIgnoresSchema() throws UnsupportedEncodingException 
{
-        assertArrayEquals("true".getBytes("UTF8"), 
converter.fromCopycatData(TOPIC, null, true));
-    }
-
-    @Test
-    public void testToBytesNonUtf8Encoding() throws 
UnsupportedEncodingException {
-        converter.configure(Collections.singletonMap("converter.encoding", 
"UTF-16"), true);
-        assertArrayEquals(SAMPLE_STRING.getBytes("UTF-16"), 
converter.fromCopycatData(TOPIC, Schema.STRING_SCHEMA, SAMPLE_STRING));
-    }
-
-    @Test
-    public void testBytesToString() {
-        SchemaAndValue data = converter.toCopycatData(TOPIC, 
SAMPLE_STRING.getBytes());
-        assertEquals(Schema.OPTIONAL_STRING_SCHEMA, data.schema());
-        assertEquals(SAMPLE_STRING, data.value());
-    }
-
-    @Test
-    public void testBytesNullToString() {
-        SchemaAndValue data = converter.toCopycatData(TOPIC, null);
-        assertEquals(Schema.OPTIONAL_STRING_SCHEMA, data.schema());
-        assertEquals(null, data.value());
-    }
-
-    @Test
-    public void testBytesToStringNonUtf8Encoding() throws 
UnsupportedEncodingException {
-        converter.configure(Collections.singletonMap("converter.encoding", 
"UTF-16"), true);
-        SchemaAndValue data = converter.toCopycatData(TOPIC, 
SAMPLE_STRING.getBytes("UTF-16"));
-        assertEquals(Schema.OPTIONAL_STRING_SCHEMA, data.schema());
-        assertEquals(SAMPLE_STRING, data.value());
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/api/src/test/java/org/apache/kafka/copycat/util/ConnectorUtilsTest.java
----------------------------------------------------------------------
diff --git 
a/copycat/api/src/test/java/org/apache/kafka/copycat/util/ConnectorUtilsTest.java
 
b/copycat/api/src/test/java/org/apache/kafka/copycat/util/ConnectorUtilsTest.java
deleted file mode 100644
index e46967b..0000000
--- 
a/copycat/api/src/test/java/org/apache/kafka/copycat/util/ConnectorUtilsTest.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.util;
-
-import org.junit.Test;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-
-import static org.junit.Assert.assertEquals;
-
-public class ConnectorUtilsTest {
-
-    private static final List<Integer> FIVE_ELEMENTS = Arrays.asList(1, 2, 3, 
4, 5);
-
-    @Test
-    public void testGroupPartitions() {
-
-        List<List<Integer>> grouped = 
ConnectorUtils.groupPartitions(FIVE_ELEMENTS, 1);
-        assertEquals(Arrays.asList(FIVE_ELEMENTS), grouped);
-
-        grouped = ConnectorUtils.groupPartitions(FIVE_ELEMENTS, 2);
-        assertEquals(Arrays.asList(Arrays.asList(1, 2, 3), Arrays.asList(4, 
5)), grouped);
-
-        grouped = ConnectorUtils.groupPartitions(FIVE_ELEMENTS, 3);
-        assertEquals(Arrays.asList(Arrays.asList(1, 2),
-                Arrays.asList(3, 4),
-                Arrays.asList(5)), grouped);
-
-        grouped = ConnectorUtils.groupPartitions(FIVE_ELEMENTS, 5);
-        assertEquals(Arrays.asList(Arrays.asList(1),
-                Arrays.asList(2),
-                Arrays.asList(3),
-                Arrays.asList(4),
-                Arrays.asList(5)), grouped);
-
-        grouped = ConnectorUtils.groupPartitions(FIVE_ELEMENTS, 7);
-        assertEquals(Arrays.asList(Arrays.asList(1),
-                Arrays.asList(2),
-                Arrays.asList(3),
-                Arrays.asList(4),
-                Arrays.asList(5),
-                Collections.EMPTY_LIST,
-                Collections.EMPTY_LIST), grouped);
-    }
-
-    @Test(expected = IllegalArgumentException.class)
-    public void testGroupPartitionsInvalidCount() {
-        ConnectorUtils.groupPartitions(FIVE_ELEMENTS, 0);
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkConnector.java
----------------------------------------------------------------------
diff --git 
a/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkConnector.java
 
b/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkConnector.java
deleted file mode 100644
index d0d59a8..0000000
--- 
a/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkConnector.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.file;
-
-import org.apache.kafka.common.utils.AppInfoParser;
-import org.apache.kafka.copycat.connector.Task;
-import org.apache.kafka.copycat.sink.SinkConnector;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Very simple connector that works with the console. This connector supports 
both source and
- * sink modes via its 'mode' setting.
- */
-public class FileStreamSinkConnector extends SinkConnector {
-    public static final String FILE_CONFIG = "file";
-
-    private String filename;
-
-    @Override
-    public String version() {
-        return AppInfoParser.getVersion();
-    }
-
-    @Override
-    public void start(Map<String, String> props) {
-        filename = props.get(FILE_CONFIG);
-    }
-
-    @Override
-    public Class<? extends Task> taskClass() {
-        return FileStreamSinkTask.class;
-    }
-
-    @Override
-    public List<Map<String, String>> taskConfigs(int maxTasks) {
-        ArrayList<Map<String, String>> configs = new ArrayList<>();
-        for (int i = 0; i < maxTasks; i++) {
-            Map<String, String> config = new HashMap<>();
-            if (filename != null)
-                config.put(FILE_CONFIG, filename);
-            configs.add(config);
-        }
-        return configs;
-    }
-
-    @Override
-    public void stop() {
-        // Nothing to do since FileStreamSinkConnector has no background 
monitoring.
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkTask.java
----------------------------------------------------------------------
diff --git 
a/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkTask.java
 
b/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkTask.java
deleted file mode 100644
index f95ef8e..0000000
--- 
a/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkTask.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.file;
-
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.copycat.errors.CopycatException;
-import org.apache.kafka.copycat.sink.SinkRecord;
-import org.apache.kafka.copycat.sink.SinkTask;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.PrintStream;
-import java.util.Collection;
-import java.util.Map;
-
-/**
- * FileStreamSinkTask writes records to stdout or a file.
- */
-public class FileStreamSinkTask extends SinkTask {
-    private static final Logger log = 
LoggerFactory.getLogger(FileStreamSinkTask.class);
-
-    private String filename;
-    private PrintStream outputStream;
-
-    public FileStreamSinkTask() {
-    }
-
-    // for testing
-    public FileStreamSinkTask(PrintStream outputStream) {
-        filename = null;
-        this.outputStream = outputStream;
-    }
-
-    @Override
-    public String version() {
-        return new FileStreamSinkConnector().version();
-    }
-
-    @Override
-    public void start(Map<String, String> props) {
-        filename = props.get(FileStreamSinkConnector.FILE_CONFIG);
-        if (filename == null) {
-            outputStream = System.out;
-        } else {
-            try {
-                outputStream = new PrintStream(new FileOutputStream(filename, 
true));
-            } catch (FileNotFoundException e) {
-                throw new CopycatException("Couldn't find or create file for 
FileStreamSinkTask", e);
-            }
-        }
-    }
-
-    @Override
-    public void put(Collection<SinkRecord> sinkRecords) {
-        for (SinkRecord record : sinkRecords) {
-            log.trace("Writing line to {}: {}", logFilename(), record.value());
-            outputStream.println(record.value());
-        }
-    }
-
-    @Override
-    public void flush(Map<TopicPartition, OffsetAndMetadata> offsets) {
-        log.trace("Flushing output stream for {}", logFilename());
-        outputStream.flush();
-    }
-
-    @Override
-    public void stop() {
-        if (outputStream != System.out)
-            outputStream.close();
-    }
-
-    private String logFilename() {
-        return filename == null ? "stdout" : filename;
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceConnector.java
----------------------------------------------------------------------
diff --git 
a/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceConnector.java
 
b/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceConnector.java
deleted file mode 100644
index 9021775..0000000
--- 
a/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceConnector.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.file;
-
-import org.apache.kafka.common.utils.AppInfoParser;
-import org.apache.kafka.copycat.connector.Task;
-import org.apache.kafka.copycat.errors.CopycatException;
-import org.apache.kafka.copycat.source.SourceConnector;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Very simple connector that works with the console. This connector supports 
both source and
- * sink modes via its 'mode' setting.
- */
-public class FileStreamSourceConnector extends SourceConnector {
-    public static final String TOPIC_CONFIG = "topic";
-    public static final String FILE_CONFIG = "file";
-
-    private String filename;
-    private String topic;
-
-    @Override
-    public String version() {
-        return AppInfoParser.getVersion();
-    }
-
-    @Override
-    public void start(Map<String, String> props) {
-        filename = props.get(FILE_CONFIG);
-        topic = props.get(TOPIC_CONFIG);
-        if (topic == null || topic.isEmpty())
-            throw new CopycatException("FileStreamSourceConnector 
configuration must include 'topic' setting");
-        if (topic.contains(","))
-            throw new CopycatException("FileStreamSourceConnector should only 
have a single topic when used as a source.");
-    }
-
-    @Override
-    public Class<? extends Task> taskClass() {
-        return FileStreamSourceTask.class;
-    }
-
-    @Override
-    public List<Map<String, String>> taskConfigs(int maxTasks) {
-        ArrayList<Map<String, String>> configs = new ArrayList<>();
-        // Only one input stream makes sense.
-        Map<String, String> config = new HashMap<>();
-        if (filename != null)
-            config.put(FILE_CONFIG, filename);
-        config.put(TOPIC_CONFIG, topic);
-        configs.add(config);
-        return configs;
-    }
-
-    @Override
-    public void stop() {
-        // Nothing to do since FileStreamSourceConnector has no background 
monitoring.
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceTask.java
----------------------------------------------------------------------
diff --git 
a/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceTask.java
 
b/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceTask.java
deleted file mode 100644
index 2a2cfbc..0000000
--- 
a/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceTask.java
+++ /dev/null
@@ -1,216 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.file;
-
-import org.apache.kafka.copycat.data.Schema;
-import org.apache.kafka.copycat.errors.CopycatException;
-import org.apache.kafka.copycat.source.SourceRecord;
-import org.apache.kafka.copycat.source.SourceTask;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.*;
-import java.util.*;
-
-/**
- * FileStreamSourceTask reads from stdin or a file.
- */
-public class FileStreamSourceTask extends SourceTask {
-    private static final Logger log = 
LoggerFactory.getLogger(FileStreamSourceTask.class);
-    public static final String FILENAME_FIELD = "filename";
-    public  static final String POSITION_FIELD = "position";
-    private static final Schema VALUE_SCHEMA = Schema.STRING_SCHEMA;
-
-    private String filename;
-    private InputStream stream;
-    private BufferedReader reader = null;
-    private char[] buffer = new char[1024];
-    private int offset = 0;
-    private String topic = null;
-
-    private Long streamOffset;
-
-    @Override
-    public String version() {
-        return new FileStreamSourceConnector().version();
-    }
-
-    @Override
-    public void start(Map<String, String> props) {
-        filename = props.get(FileStreamSourceConnector.FILE_CONFIG);
-        if (filename == null || filename.isEmpty()) {
-            stream = System.in;
-            // Tracking offset for stdin doesn't make sense
-            streamOffset = null;
-            reader = new BufferedReader(new InputStreamReader(stream));
-        }
-        topic = props.get(FileStreamSourceConnector.TOPIC_CONFIG);
-        if (topic == null)
-            throw new CopycatException("FileStreamSourceTask config missing 
topic setting");
-    }
-
-    @Override
-    public List<SourceRecord> poll() throws InterruptedException {
-        if (stream == null) {
-            try {
-                stream = new FileInputStream(filename);
-                Map<String, Object> offset = 
context.offsetStorageReader().offset(Collections.singletonMap(FILENAME_FIELD, 
filename));
-                if (offset != null) {
-                    Object lastRecordedOffset = offset.get(POSITION_FIELD);
-                    if (lastRecordedOffset != null && !(lastRecordedOffset 
instanceof Long))
-                        throw new CopycatException("Offset position is the 
incorrect type");
-                    if (lastRecordedOffset != null) {
-                        log.debug("Found previous offset, trying to skip to 
file offset {}", lastRecordedOffset);
-                        long skipLeft = (Long) lastRecordedOffset;
-                        while (skipLeft > 0) {
-                            try {
-                                long skipped = stream.skip(skipLeft);
-                                skipLeft -= skipped;
-                            } catch (IOException e) {
-                                log.error("Error while trying to seek to 
previous offset in file: ", e);
-                                throw new CopycatException(e);
-                            }
-                        }
-                        log.debug("Skipped to offset {}", lastRecordedOffset);
-                    }
-                    streamOffset = (lastRecordedOffset != null) ? (Long) 
lastRecordedOffset : 0L;
-                } else {
-                    streamOffset = 0L;
-                }
-                reader = new BufferedReader(new InputStreamReader(stream));
-                log.debug("Opened {} for reading", logFilename());
-            } catch (FileNotFoundException e) {
-                log.warn("Couldn't find file for FileStreamSourceTask, 
sleeping to wait for it to be created");
-                synchronized (this) {
-                    this.wait(1000);
-                }
-                return null;
-            }
-        }
-
-        // Unfortunately we can't just use readLine() because it blocks in an 
uninterruptible way.
-        // Instead we have to manage splitting lines ourselves, using simple 
backoff when no new data
-        // is available.
-        try {
-            final BufferedReader readerCopy;
-            synchronized (this) {
-                readerCopy = reader;
-            }
-            if (readerCopy == null)
-                return null;
-
-            ArrayList<SourceRecord> records = null;
-
-            int nread = 0;
-            while (readerCopy.ready()) {
-                nread = readerCopy.read(buffer, offset, buffer.length - 
offset);
-                log.trace("Read {} bytes from {}", nread, logFilename());
-
-                if (nread > 0) {
-                    offset += nread;
-                    if (offset == buffer.length) {
-                        char[] newbuf = new char[buffer.length * 2];
-                        System.arraycopy(buffer, 0, newbuf, 0, buffer.length);
-                        buffer = newbuf;
-                    }
-
-                    String line;
-                    do {
-                        line = extractLine();
-                        if (line != null) {
-                            log.trace("Read a line from {}", logFilename());
-                            if (records == null)
-                                records = new ArrayList<>();
-                            records.add(new SourceRecord(offsetKey(filename), 
offsetValue(streamOffset), topic, VALUE_SCHEMA, line));
-                        }
-                        new ArrayList<SourceRecord>();
-                    } while (line != null);
-                }
-            }
-
-            if (nread <= 0)
-                synchronized (this) {
-                    this.wait(1000);
-                }
-
-            return records;
-        } catch (IOException e) {
-            // Underlying stream was killed, probably as a result of calling 
stop. Allow to return
-            // null, and driving thread will handle any shutdown if necessary.
-        }
-        return null;
-    }
-
-    private String extractLine() {
-        int until = -1, newStart = -1;
-        for (int i = 0; i < offset; i++) {
-            if (buffer[i] == '\n') {
-                until = i;
-                newStart = i + 1;
-                break;
-            } else if (buffer[i] == '\r') {
-                // We need to check for \r\n, so we must skip this if we can't 
check the next char
-                if (i + 1 >= offset)
-                    return null;
-
-                until = i;
-                newStart = (buffer[i + 1] == '\n') ? i + 2 : i + 1;
-                break;
-            }
-        }
-
-        if (until != -1) {
-            String result = new String(buffer, 0, until);
-            System.arraycopy(buffer, newStart, buffer, 0, buffer.length - 
newStart);
-            offset = offset - newStart;
-            if (streamOffset != null)
-                streamOffset += newStart;
-            return result;
-        } else {
-            return null;
-        }
-    }
-
-    @Override
-    public void stop() {
-        log.trace("Stopping");
-        synchronized (this) {
-            try {
-                if (stream != null && stream != System.in) {
-                    stream.close();
-                    log.trace("Closed input stream");
-                }
-            } catch (IOException e) {
-                log.error("Failed to close FileStreamSourceTask stream: ", e);
-            }
-            this.notify();
-        }
-    }
-
-    private Map<String, String> offsetKey(String filename) {
-        return Collections.singletonMap(FILENAME_FIELD, filename);
-    }
-
-    private Map<String, Long> offsetValue(Long pos) {
-        return Collections.singletonMap(POSITION_FIELD, pos);
-    }
-
-    private String logFilename() {
-        return filename == null ? "stdin" : filename;
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSinkConnectorTest.java
----------------------------------------------------------------------
diff --git 
a/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSinkConnectorTest.java
 
b/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSinkConnectorTest.java
deleted file mode 100644
index b30856f..0000000
--- 
a/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSinkConnectorTest.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.file;
-
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.copycat.connector.ConnectorContext;
-import org.apache.kafka.copycat.sink.SinkConnector;
-import org.junit.Before;
-import org.junit.Test;
-import org.powermock.api.easymock.PowerMock;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.junit.Assert.assertEquals;
-
-public class FileStreamSinkConnectorTest {
-
-    private static final String MULTIPLE_TOPICS = "test1,test2";
-    private static final String[] MULTIPLE_TOPICS_LIST
-            = MULTIPLE_TOPICS.split(",");
-    private static final List<TopicPartition> MULTIPLE_TOPICS_PARTITIONS = 
Arrays.asList(
-            new TopicPartition("test1", 1), new TopicPartition("test2", 2)
-    );
-    private static final String FILENAME = "/afilename";
-
-    private FileStreamSinkConnector connector;
-    private ConnectorContext ctx;
-    private Map<String, String> sinkProperties;
-
-    @Before
-    public void setup() {
-        connector = new FileStreamSinkConnector();
-        ctx = PowerMock.createMock(ConnectorContext.class);
-        connector.initialize(ctx);
-
-        sinkProperties = new HashMap<>();
-        sinkProperties.put(SinkConnector.TOPICS_CONFIG, MULTIPLE_TOPICS);
-        sinkProperties.put(FileStreamSinkConnector.FILE_CONFIG, FILENAME);
-    }
-
-    @Test
-    public void testSinkTasks() {
-        PowerMock.replayAll();
-
-        connector.start(sinkProperties);
-        List<Map<String, String>> taskConfigs = connector.taskConfigs(1);
-        assertEquals(1, taskConfigs.size());
-        assertEquals(FILENAME, 
taskConfigs.get(0).get(FileStreamSinkConnector.FILE_CONFIG));
-
-        taskConfigs = connector.taskConfigs(2);
-        assertEquals(2, taskConfigs.size());
-        for (int i = 0; i < 2; i++) {
-            assertEquals(FILENAME, 
taskConfigs.get(0).get(FileStreamSinkConnector.FILE_CONFIG));
-        }
-
-        PowerMock.verifyAll();
-    }
-
-    @Test
-    public void testTaskClass() {
-        PowerMock.replayAll();
-
-        connector.start(sinkProperties);
-        assertEquals(FileStreamSinkTask.class, connector.taskClass());
-
-        PowerMock.verifyAll();
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSinkTaskTest.java
----------------------------------------------------------------------
diff --git 
a/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSinkTaskTest.java
 
b/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSinkTaskTest.java
deleted file mode 100644
index ac8b5f1..0000000
--- 
a/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSinkTaskTest.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.file;
-
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.copycat.data.Schema;
-import org.apache.kafka.copycat.sink.SinkRecord;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.ByteArrayOutputStream;
-import java.io.PrintStream;
-import java.util.Arrays;
-import java.util.HashMap;
-
-import static org.junit.Assert.assertEquals;
-
-public class FileStreamSinkTaskTest {
-
-    private FileStreamSinkTask task;
-    private ByteArrayOutputStream os;
-    private PrintStream printStream;
-
-    @Before
-    public void setup() {
-        os = new ByteArrayOutputStream();
-        printStream = new PrintStream(os);
-        task = new FileStreamSinkTask(printStream);
-    }
-
-    @Test
-    public void testPutFlush() {
-        HashMap<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
-
-        // We do not call task.start() since it would override the output 
stream
-
-        task.put(Arrays.asList(
-                new SinkRecord("topic1", 0, null, null, Schema.STRING_SCHEMA, 
"line1", 1)
-        ));
-        offsets.put(new TopicPartition("topic1", 0), new 
OffsetAndMetadata(1L));
-        task.flush(offsets);
-        assertEquals("line1\n", os.toString());
-
-        task.put(Arrays.asList(
-                new SinkRecord("topic1", 0, null, null, Schema.STRING_SCHEMA, 
"line2", 2),
-                new SinkRecord("topic2", 0, null, null, Schema.STRING_SCHEMA, 
"line3", 1)
-        ));
-        offsets.put(new TopicPartition("topic1", 0), new 
OffsetAndMetadata(2L));
-        offsets.put(new TopicPartition("topic2", 0), new 
OffsetAndMetadata(1L));
-        task.flush(offsets);
-        assertEquals("line1\nline2\nline3\n", os.toString());
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceConnectorTest.java
----------------------------------------------------------------------
diff --git 
a/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceConnectorTest.java
 
b/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceConnectorTest.java
deleted file mode 100644
index 28bfa62..0000000
--- 
a/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceConnectorTest.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.file;
-
-import org.apache.kafka.copycat.connector.ConnectorContext;
-import org.apache.kafka.copycat.errors.CopycatException;
-import org.junit.Before;
-import org.junit.Test;
-import org.powermock.api.easymock.PowerMock;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-
-public class FileStreamSourceConnectorTest {
-
-    private static final String SINGLE_TOPIC = "test";
-    private static final String MULTIPLE_TOPICS = "test1,test2";
-    private static final String FILENAME = "/somefilename";
-
-    private FileStreamSourceConnector connector;
-    private ConnectorContext ctx;
-    private Map<String, String> sourceProperties;
-
-    @Before
-    public void setup() {
-        connector = new FileStreamSourceConnector();
-        ctx = PowerMock.createMock(ConnectorContext.class);
-        connector.initialize(ctx);
-
-        sourceProperties = new HashMap<>();
-        sourceProperties.put(FileStreamSourceConnector.TOPIC_CONFIG, 
SINGLE_TOPIC);
-        sourceProperties.put(FileStreamSourceConnector.FILE_CONFIG, FILENAME);
-    }
-
-    @Test
-    public void testSourceTasks() {
-        PowerMock.replayAll();
-
-        connector.start(sourceProperties);
-        List<Map<String, String>> taskConfigs = connector.taskConfigs(1);
-        assertEquals(1, taskConfigs.size());
-        assertEquals(FILENAME,
-                taskConfigs.get(0).get(FileStreamSourceConnector.FILE_CONFIG));
-        assertEquals(SINGLE_TOPIC,
-                
taskConfigs.get(0).get(FileStreamSourceConnector.TOPIC_CONFIG));
-
-        // Should be able to return fewer than requested #
-        taskConfigs = connector.taskConfigs(2);
-        assertEquals(1, taskConfigs.size());
-        assertEquals(FILENAME,
-                taskConfigs.get(0).get(FileStreamSourceConnector.FILE_CONFIG));
-        assertEquals(SINGLE_TOPIC,
-                
taskConfigs.get(0).get(FileStreamSourceConnector.TOPIC_CONFIG));
-
-        PowerMock.verifyAll();
-    }
-
-    @Test
-    public void testSourceTasksStdin() {
-        PowerMock.replayAll();
-
-        sourceProperties.remove(FileStreamSourceConnector.FILE_CONFIG);
-        connector.start(sourceProperties);
-        List<Map<String, String>> taskConfigs = connector.taskConfigs(1);
-        assertEquals(1, taskConfigs.size());
-        
assertNull(taskConfigs.get(0).get(FileStreamSourceConnector.FILE_CONFIG));
-
-        PowerMock.verifyAll();
-    }
-
-    @Test(expected = CopycatException.class)
-    public void testMultipleSourcesInvalid() {
-        sourceProperties.put(FileStreamSourceConnector.TOPIC_CONFIG, 
MULTIPLE_TOPICS);
-        connector.start(sourceProperties);
-    }
-
-    @Test
-    public void testTaskClass() {
-        PowerMock.replayAll();
-
-        connector.start(sourceProperties);
-        assertEquals(FileStreamSourceTask.class, connector.taskClass());
-
-        PowerMock.verifyAll();
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceTaskTest.java
----------------------------------------------------------------------
diff --git 
a/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceTaskTest.java
 
b/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceTaskTest.java
deleted file mode 100644
index ddf8e43..0000000
--- 
a/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceTaskTest.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.file;
-
-import org.apache.kafka.copycat.errors.CopycatException;
-import org.apache.kafka.copycat.source.SourceRecord;
-import org.apache.kafka.copycat.source.SourceTaskContext;
-import org.apache.kafka.copycat.storage.OffsetStorageReader;
-import org.easymock.EasyMock;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.powermock.api.easymock.PowerMock;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.junit.Assert.assertEquals;
-
-public class FileStreamSourceTaskTest {
-
-    private static final String TOPIC = "test";
-
-    private File tempFile;
-    private Map<String, String> config;
-    private OffsetStorageReader offsetStorageReader;
-    private SourceTaskContext context;
-    private FileStreamSourceTask task;
-
-    private boolean verifyMocks = false;
-
-    @Before
-    public void setup() throws IOException {
-        tempFile = File.createTempFile("file-stream-source-task-test", null);
-        config = new HashMap<>();
-        config.put(FileStreamSourceConnector.FILE_CONFIG, 
tempFile.getAbsolutePath());
-        config.put(FileStreamSourceConnector.TOPIC_CONFIG, TOPIC);
-        task = new FileStreamSourceTask();
-        offsetStorageReader = PowerMock.createMock(OffsetStorageReader.class);
-        context = PowerMock.createMock(SourceTaskContext.class);
-        task.initialize(context);
-    }
-
-    @After
-    public void teardown() {
-        tempFile.delete();
-
-        if (verifyMocks)
-            PowerMock.verifyAll();
-    }
-
-    private void replay() {
-        PowerMock.replayAll();
-        verifyMocks = true;
-    }
-
-    @Test
-    public void testNormalLifecycle() throws InterruptedException, IOException 
{
-        expectOffsetLookupReturnNone();
-        replay();
-
-        task.start(config);
-
-        FileOutputStream os = new FileOutputStream(tempFile);
-        assertEquals(null, task.poll());
-        os.write("partial line".getBytes());
-        os.flush();
-        assertEquals(null, task.poll());
-        os.write(" finished\n".getBytes());
-        os.flush();
-        List<SourceRecord> records = task.poll();
-        assertEquals(1, records.size());
-        assertEquals(TOPIC, records.get(0).topic());
-        assertEquals("partial line finished", records.get(0).value());
-        
assertEquals(Collections.singletonMap(FileStreamSourceTask.FILENAME_FIELD, 
tempFile.getAbsolutePath()), records.get(0).sourcePartition());
-        
assertEquals(Collections.singletonMap(FileStreamSourceTask.POSITION_FIELD, 
22L), records.get(0).sourceOffset());
-        assertEquals(null, task.poll());
-
-        // Different line endings, and make sure the final \r doesn't result 
in a line until we can
-        // read the subsequent byte.
-        os.write("line1\rline2\r\nline3\nline4\n\r".getBytes());
-        os.flush();
-        records = task.poll();
-        assertEquals(4, records.size());
-        assertEquals("line1", records.get(0).value());
-        
assertEquals(Collections.singletonMap(FileStreamSourceTask.FILENAME_FIELD, 
tempFile.getAbsolutePath()), records.get(0).sourcePartition());
-        
assertEquals(Collections.singletonMap(FileStreamSourceTask.POSITION_FIELD, 
28L), records.get(0).sourceOffset());
-        assertEquals("line2", records.get(1).value());
-        
assertEquals(Collections.singletonMap(FileStreamSourceTask.FILENAME_FIELD, 
tempFile.getAbsolutePath()), records.get(1).sourcePartition());
-        
assertEquals(Collections.singletonMap(FileStreamSourceTask.POSITION_FIELD, 
35L), records.get(1).sourceOffset());
-        assertEquals("line3", records.get(2).value());
-        
assertEquals(Collections.singletonMap(FileStreamSourceTask.FILENAME_FIELD, 
tempFile.getAbsolutePath()), records.get(2).sourcePartition());
-        
assertEquals(Collections.singletonMap(FileStreamSourceTask.POSITION_FIELD, 
41L), records.get(2).sourceOffset());
-        assertEquals("line4", records.get(3).value());
-        
assertEquals(Collections.singletonMap(FileStreamSourceTask.FILENAME_FIELD, 
tempFile.getAbsolutePath()), records.get(3).sourcePartition());
-        
assertEquals(Collections.singletonMap(FileStreamSourceTask.POSITION_FIELD, 
47L), records.get(3).sourceOffset());
-
-        os.write("subsequent text".getBytes());
-        os.flush();
-        records = task.poll();
-        assertEquals(1, records.size());
-        assertEquals("", records.get(0).value());
-        
assertEquals(Collections.singletonMap(FileStreamSourceTask.FILENAME_FIELD, 
tempFile.getAbsolutePath()), records.get(0).sourcePartition());
-        
assertEquals(Collections.singletonMap(FileStreamSourceTask.POSITION_FIELD, 
48L), records.get(0).sourceOffset());
-
-        task.stop();
-    }
-
-    @Test(expected = CopycatException.class)
-    public void testMissingTopic() throws InterruptedException {
-        replay();
-
-        config.remove(FileStreamSourceConnector.TOPIC_CONFIG);
-        task.start(config);
-    }
-
-    public void testInvalidFile() throws InterruptedException {
-        config.put(FileStreamSourceConnector.FILE_CONFIG, "bogusfilename");
-        task.start(config);
-        // Currently the task retries indefinitely if the file isn't found, 
but shouldn't return any data.
-        for (int i = 0; i < 100; i++)
-            assertEquals(null, task.poll());
-    }
-
-
-    private void expectOffsetLookupReturnNone() {
-        
EasyMock.expect(context.offsetStorageReader()).andReturn(offsetStorageReader);
-        
EasyMock.expect(offsetStorageReader.offset(EasyMock.anyObject(Map.class))).andReturn(null);
-    }
-}
\ No newline at end of file

Reply via email to