http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/api/src/test/java/org/apache/kafka/copycat/data/SchemaProjectorTest.java ---------------------------------------------------------------------- diff --git a/copycat/api/src/test/java/org/apache/kafka/copycat/data/SchemaProjectorTest.java b/copycat/api/src/test/java/org/apache/kafka/copycat/data/SchemaProjectorTest.java deleted file mode 100644 index 31a6f79..0000000 --- a/copycat/api/src/test/java/org/apache/kafka/copycat/data/SchemaProjectorTest.java +++ /dev/null @@ -1,495 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - **/ - -package org.apache.kafka.copycat.data; - -import org.apache.kafka.copycat.data.Schema.Type; -import org.apache.kafka.copycat.errors.DataException; -import org.apache.kafka.copycat.errors.SchemaProjectorException; -import org.junit.Test; - -import java.math.BigDecimal; -import java.math.BigInteger; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; - -public class SchemaProjectorTest { - - @Test - public void testPrimitiveTypeProjection() throws Exception { - Object projected; - projected = SchemaProjector.project(Schema.BOOLEAN_SCHEMA, false, Schema.BOOLEAN_SCHEMA); - assertEquals(false, projected); - - byte[] bytes = {(byte) 1, (byte) 2}; - projected = SchemaProjector.project(Schema.BYTES_SCHEMA, bytes, Schema.BYTES_SCHEMA); - assertEquals(bytes, projected); - - projected = SchemaProjector.project(Schema.STRING_SCHEMA, "abc", Schema.STRING_SCHEMA); - assertEquals("abc", projected); - - projected = SchemaProjector.project(Schema.BOOLEAN_SCHEMA, false, Schema.OPTIONAL_BOOLEAN_SCHEMA); - assertEquals(false, projected); - - projected = SchemaProjector.project(Schema.BYTES_SCHEMA, bytes, Schema.OPTIONAL_BYTES_SCHEMA); - assertEquals(bytes, projected); - - projected = SchemaProjector.project(Schema.STRING_SCHEMA, "abc", Schema.OPTIONAL_STRING_SCHEMA); - assertEquals("abc", projected); - - try { - SchemaProjector.project(Schema.OPTIONAL_BOOLEAN_SCHEMA, false, Schema.BOOLEAN_SCHEMA); - fail("Cannot project optional schema to schema with no default value."); - } catch (DataException e) { - // expected - } - - try { - SchemaProjector.project(Schema.OPTIONAL_BYTES_SCHEMA, bytes, Schema.BYTES_SCHEMA); - fail("Cannot project optional schema to schema with no default value."); - } catch (DataException e) { - // expected - } - - try { - SchemaProjector.project(Schema.OPTIONAL_STRING_SCHEMA, "abc", Schema.STRING_SCHEMA); - fail("Cannot project optional schema to schema with no default value."); - } catch (DataException e) { - // expected - } - } - - @Test - public void testNumericTypeProjection() throws Exception { - Schema[] promotableSchemas = {Schema.INT8_SCHEMA, Schema.INT16_SCHEMA, Schema.INT32_SCHEMA, Schema.INT64_SCHEMA, Schema.FLOAT32_SCHEMA, Schema.FLOAT64_SCHEMA}; - Schema[] promotableOptionalSchemas = {Schema.OPTIONAL_INT8_SCHEMA, Schema.OPTIONAL_INT16_SCHEMA, Schema.OPTIONAL_INT32_SCHEMA, Schema.OPTIONAL_INT64_SCHEMA, - Schema.OPTIONAL_FLOAT32_SCHEMA, Schema.OPTIONAL_FLOAT64_SCHEMA}; - - Object[] values = {(byte) 127, (short) 255, 32767, 327890L, 1.2F, 1.2345}; - Map<Object, List<?>> expectedProjected = new HashMap<>(); - expectedProjected.put(values[0], Arrays.asList((byte) 127, (short) 127, 127, 127L, 127.F, 127.)); - expectedProjected.put(values[1], Arrays.asList((short) 255, 255, 255L, 255.F, 255.)); - expectedProjected.put(values[2], Arrays.asList(32767, 32767L, 32767.F, 32767.)); - expectedProjected.put(values[3], Arrays.asList(327890L, 327890.F, 327890.)); - expectedProjected.put(values[4], Arrays.asList(1.2F, 1.2)); - expectedProjected.put(values[5], Arrays.asList(1.2345)); - - Object promoted; - for (int i = 0; i < promotableSchemas.length; ++i) { - Schema source = promotableSchemas[i]; - List<?> expected = expectedProjected.get(values[i]); - for (int j = i; j < promotableSchemas.length; ++j) { - Schema target = promotableSchemas[j]; - promoted = SchemaProjector.project(source, values[i], target); - if (target.type() == Type.FLOAT64) { - assertEquals((Double) (expected.get(j - i)), (double) promoted, 1e-6); - } else { - assertEquals(expected.get(j - i), promoted); - } - } - for (int j = i; j < promotableOptionalSchemas.length; ++j) { - Schema target = promotableOptionalSchemas[j]; - promoted = SchemaProjector.project(source, values[i], target); - if (target.type() == Type.FLOAT64) { - assertEquals((Double) (expected.get(j - i)), (double) promoted, 1e-6); - } else { - assertEquals(expected.get(j - i), promoted); - } - } - } - - for (int i = 0; i < promotableOptionalSchemas.length; ++i) { - Schema source = promotableSchemas[i]; - List<?> expected = expectedProjected.get(values[i]); - for (int j = i; j < promotableOptionalSchemas.length; ++j) { - Schema target = promotableOptionalSchemas[j]; - promoted = SchemaProjector.project(source, values[i], target); - if (target.type() == Type.FLOAT64) { - assertEquals((Double) (expected.get(j - i)), (double) promoted, 1e-6); - } else { - assertEquals(expected.get(j - i), promoted); - } - } - } - - Schema[] nonPromotableSchemas = {Schema.BOOLEAN_SCHEMA, Schema.BYTES_SCHEMA, Schema.STRING_SCHEMA}; - for (Schema promotableSchema: promotableSchemas) { - for (Schema nonPromotableSchema: nonPromotableSchemas) { - Object dummy = new Object(); - try { - SchemaProjector.project(promotableSchema, dummy, nonPromotableSchema); - fail("Cannot promote " + promotableSchema.type() + " to " + nonPromotableSchema.type()); - } catch (DataException e) { - // expected - } - } - } - } - - @Test - public void testPrimitiveOptionalProjection() throws Exception { - verifyOptionalProjection(Schema.OPTIONAL_BOOLEAN_SCHEMA, Type.BOOLEAN, false, true, false, true); - verifyOptionalProjection(Schema.OPTIONAL_BOOLEAN_SCHEMA, Type.BOOLEAN, false, true, false, false); - - byte[] bytes = {(byte) 1, (byte) 2}; - byte[] defaultBytes = {(byte) 3, (byte) 4}; - verifyOptionalProjection(Schema.OPTIONAL_BYTES_SCHEMA, Type.BYTES, bytes, defaultBytes, bytes, true); - verifyOptionalProjection(Schema.OPTIONAL_BYTES_SCHEMA, Type.BYTES, bytes, defaultBytes, bytes, false); - - verifyOptionalProjection(Schema.OPTIONAL_STRING_SCHEMA, Type.STRING, "abc", "def", "abc", true); - verifyOptionalProjection(Schema.OPTIONAL_STRING_SCHEMA, Type.STRING, "abc", "def", "abc", false); - - verifyOptionalProjection(Schema.OPTIONAL_INT8_SCHEMA, Type.INT8, (byte) 12, (byte) 127, (byte) 12, true); - verifyOptionalProjection(Schema.OPTIONAL_INT8_SCHEMA, Type.INT8, (byte) 12, (byte) 127, (byte) 12, false); - verifyOptionalProjection(Schema.OPTIONAL_INT8_SCHEMA, Type.INT16, (byte) 12, (short) 127, (short) 12, true); - verifyOptionalProjection(Schema.OPTIONAL_INT8_SCHEMA, Type.INT16, (byte) 12, (short) 127, (short) 12, false); - verifyOptionalProjection(Schema.OPTIONAL_INT8_SCHEMA, Type.INT32, (byte) 12, 12789, 12, true); - verifyOptionalProjection(Schema.OPTIONAL_INT8_SCHEMA, Type.INT32, (byte) 12, 12789, 12, false); - verifyOptionalProjection(Schema.OPTIONAL_INT8_SCHEMA, Type.INT64, (byte) 12, 127890L, 12L, true); - verifyOptionalProjection(Schema.OPTIONAL_INT8_SCHEMA, Type.INT64, (byte) 12, 127890L, 12L, false); - verifyOptionalProjection(Schema.OPTIONAL_INT8_SCHEMA, Type.FLOAT32, (byte) 12, 3.45F, 12.F, true); - verifyOptionalProjection(Schema.OPTIONAL_INT8_SCHEMA, Type.FLOAT32, (byte) 12, 3.45F, 12.F, false); - verifyOptionalProjection(Schema.OPTIONAL_INT8_SCHEMA, Type.FLOAT64, (byte) 12, 3.4567, 12., true); - verifyOptionalProjection(Schema.OPTIONAL_INT8_SCHEMA, Type.FLOAT64, (byte) 12, 3.4567, 12., false); - - verifyOptionalProjection(Schema.OPTIONAL_INT16_SCHEMA, Type.INT16, (short) 12, (short) 127, (short) 12, true); - verifyOptionalProjection(Schema.OPTIONAL_INT16_SCHEMA, Type.INT16, (short) 12, (short) 127, (short) 12, false); - verifyOptionalProjection(Schema.OPTIONAL_INT16_SCHEMA, Type.INT32, (short) 12, 12789, 12, true); - verifyOptionalProjection(Schema.OPTIONAL_INT16_SCHEMA, Type.INT32, (short) 12, 12789, 12, false); - verifyOptionalProjection(Schema.OPTIONAL_INT16_SCHEMA, Type.INT64, (short) 12, 127890L, 12L, true); - verifyOptionalProjection(Schema.OPTIONAL_INT16_SCHEMA, Type.INT64, (short) 12, 127890L, 12L, false); - verifyOptionalProjection(Schema.OPTIONAL_INT16_SCHEMA, Type.FLOAT32, (short) 12, 3.45F, 12.F, true); - verifyOptionalProjection(Schema.OPTIONAL_INT16_SCHEMA, Type.FLOAT32, (short) 12, 3.45F, 12.F, false); - verifyOptionalProjection(Schema.OPTIONAL_INT16_SCHEMA, Type.FLOAT64, (short) 12, 3.4567, 12., true); - verifyOptionalProjection(Schema.OPTIONAL_INT16_SCHEMA, Type.FLOAT64, (short) 12, 3.4567, 12., false); - - verifyOptionalProjection(Schema.OPTIONAL_INT32_SCHEMA, Type.INT32, 12, 12789, 12, true); - verifyOptionalProjection(Schema.OPTIONAL_INT32_SCHEMA, Type.INT32, 12, 12789, 12, false); - verifyOptionalProjection(Schema.OPTIONAL_INT32_SCHEMA, Type.INT64, 12, 127890L, 12L, true); - verifyOptionalProjection(Schema.OPTIONAL_INT32_SCHEMA, Type.INT64, 12, 127890L, 12L, false); - verifyOptionalProjection(Schema.OPTIONAL_INT32_SCHEMA, Type.FLOAT32, 12, 3.45F, 12.F, true); - verifyOptionalProjection(Schema.OPTIONAL_INT32_SCHEMA, Type.FLOAT32, 12, 3.45F, 12.F, false); - verifyOptionalProjection(Schema.OPTIONAL_INT32_SCHEMA, Type.FLOAT64, 12, 3.4567, 12., true); - verifyOptionalProjection(Schema.OPTIONAL_INT32_SCHEMA, Type.FLOAT64, 12, 3.4567, 12., false); - - verifyOptionalProjection(Schema.OPTIONAL_INT64_SCHEMA, Type.INT64, 12L, 127890L, 12L, true); - verifyOptionalProjection(Schema.OPTIONAL_INT64_SCHEMA, Type.INT64, 12L, 127890L, 12L, false); - verifyOptionalProjection(Schema.OPTIONAL_INT64_SCHEMA, Type.FLOAT32, 12L, 3.45F, 12.F, true); - verifyOptionalProjection(Schema.OPTIONAL_INT64_SCHEMA, Type.FLOAT32, 12L, 3.45F, 12.F, false); - verifyOptionalProjection(Schema.OPTIONAL_INT64_SCHEMA, Type.FLOAT64, 12L, 3.4567, 12., true); - verifyOptionalProjection(Schema.OPTIONAL_INT64_SCHEMA, Type.FLOAT64, 12L, 3.4567, 12., false); - - verifyOptionalProjection(Schema.OPTIONAL_FLOAT32_SCHEMA, Type.FLOAT32, 12.345F, 3.45F, 12.345F, true); - verifyOptionalProjection(Schema.OPTIONAL_FLOAT32_SCHEMA, Type.FLOAT32, 12.345F, 3.45F, 12.345F, false); - verifyOptionalProjection(Schema.OPTIONAL_FLOAT32_SCHEMA, Type.FLOAT64, 12.345F, 3.4567, 12.345, true); - verifyOptionalProjection(Schema.OPTIONAL_FLOAT32_SCHEMA, Type.FLOAT64, 12.345F, 3.4567, 12.345, false); - - verifyOptionalProjection(Schema.OPTIONAL_FLOAT32_SCHEMA, Type.FLOAT64, 12.345, 3.4567, 12.345, true); - verifyOptionalProjection(Schema.OPTIONAL_FLOAT32_SCHEMA, Type.FLOAT64, 12.345, 3.4567, 12.345, false); - } - - @Test - public void testStructAddField() throws Exception { - Schema source = SchemaBuilder.struct() - .field("field", Schema.INT32_SCHEMA) - .build(); - Struct sourceStruct = new Struct(source); - sourceStruct.put("field", 1); - - Schema target = SchemaBuilder.struct() - .field("field", Schema.INT32_SCHEMA) - .field("field2", SchemaBuilder.int32().defaultValue(123).build()) - .build(); - - Struct targetStruct = (Struct) SchemaProjector.project(source, sourceStruct, target); - - - assertEquals(1, (int) targetStruct.getInt32("field")); - assertEquals(123, (int) targetStruct.getInt32("field2")); - - Schema incompatibleTargetSchema = SchemaBuilder.struct() - .field("field", Schema.INT32_SCHEMA) - .field("field2", Schema.INT32_SCHEMA) - .build(); - - try { - SchemaProjector.project(source, sourceStruct, incompatibleTargetSchema); - fail("Incompatible schema."); - } catch (DataException e) { - // expected - } - } - - @Test - public void testStructRemoveField() throws Exception { - Schema source = SchemaBuilder.struct() - .field("field", Schema.INT32_SCHEMA) - .field("field2", Schema.INT32_SCHEMA) - .build(); - Struct sourceStruct = new Struct(source); - sourceStruct.put("field", 1); - sourceStruct.put("field2", 234); - - Schema target = SchemaBuilder.struct() - .field("field", Schema.INT32_SCHEMA) - .build(); - Struct targetStruct = (Struct) SchemaProjector.project(source, sourceStruct, target); - - assertEquals(1, targetStruct.get("field")); - try { - targetStruct.get("field2"); - fail("field2 is not part of the projected struct"); - } catch (DataException e) { - // expected - } - } - - @Test - public void testStructDefaultValue() throws Exception { - Schema source = SchemaBuilder.struct().optional() - .field("field", Schema.INT32_SCHEMA) - .field("field2", Schema.INT32_SCHEMA) - .build(); - - SchemaBuilder builder = SchemaBuilder.struct() - .field("field", Schema.INT32_SCHEMA) - .field("field2", Schema.INT32_SCHEMA); - - Struct defaultStruct = new Struct(builder).put("field", 12).put("field2", 345); - builder.defaultValue(defaultStruct); - Schema target = builder.build(); - - Object projected = SchemaProjector.project(source, null, target); - assertEquals(defaultStruct, projected); - - Struct sourceStruct = new Struct(source).put("field", 45).put("field2", 678); - Struct targetStruct = (Struct) SchemaProjector.project(source, sourceStruct, target); - - assertEquals(sourceStruct.get("field"), targetStruct.get("field")); - assertEquals(sourceStruct.get("field2"), targetStruct.get("field2")); - } - - @Test - public void testNestedSchemaProjection() throws Exception { - Schema sourceFlatSchema = SchemaBuilder.struct() - .field("field", Schema.INT32_SCHEMA) - .build(); - Schema targetFlatSchema = SchemaBuilder.struct() - .field("field", Schema.INT32_SCHEMA) - .field("field2", SchemaBuilder.int32().defaultValue(123).build()) - .build(); - Schema sourceNestedSchema = SchemaBuilder.struct() - .field("first", Schema.INT32_SCHEMA) - .field("second", Schema.STRING_SCHEMA) - .field("array", SchemaBuilder.array(Schema.INT32_SCHEMA).build()) - .field("map", SchemaBuilder.map(Schema.INT32_SCHEMA, Schema.STRING_SCHEMA).build()) - .field("nested", sourceFlatSchema) - .build(); - Schema targetNestedSchema = SchemaBuilder.struct() - .field("first", Schema.INT32_SCHEMA) - .field("second", Schema.STRING_SCHEMA) - .field("array", SchemaBuilder.array(Schema.INT32_SCHEMA).build()) - .field("map", SchemaBuilder.map(Schema.INT32_SCHEMA, Schema.STRING_SCHEMA).build()) - .field("nested", targetFlatSchema) - .build(); - - Struct sourceFlatStruct = new Struct(sourceFlatSchema); - sourceFlatStruct.put("field", 113); - - Struct sourceNestedStruct = new Struct(sourceNestedSchema); - sourceNestedStruct.put("first", 1); - sourceNestedStruct.put("second", "abc"); - sourceNestedStruct.put("array", Arrays.asList(1, 2)); - sourceNestedStruct.put("map", Collections.singletonMap(5, "def")); - sourceNestedStruct.put("nested", sourceFlatStruct); - - Struct targetNestedStruct = (Struct) SchemaProjector.project(sourceNestedSchema, sourceNestedStruct, - targetNestedSchema); - assertEquals(1, targetNestedStruct.get("first")); - assertEquals("abc", targetNestedStruct.get("second")); - assertEquals(Arrays.asList(1, 2), (List<Integer>) targetNestedStruct.get("array")); - assertEquals(Collections.singletonMap(5, "def"), (Map<Integer, String>) targetNestedStruct.get("map")); - - Struct projectedStruct = (Struct) targetNestedStruct.get("nested"); - assertEquals(113, projectedStruct.get("field")); - assertEquals(123, projectedStruct.get("field2")); - } - - @Test - public void testLogicalTypeProjection() throws Exception { - Schema[] logicalTypeSchemas = {Decimal.schema(2), Date.SCHEMA, Time.SCHEMA, Timestamp.SCHEMA}; - Object projected; - - BigDecimal testDecimal = new BigDecimal(new BigInteger("156"), 2); - projected = SchemaProjector.project(Decimal.schema(2), testDecimal, Decimal.schema(2)); - assertEquals(testDecimal, projected); - - projected = SchemaProjector.project(Date.SCHEMA, 1000, Date.SCHEMA); - assertEquals(1000, projected); - - projected = SchemaProjector.project(Time.SCHEMA, 231, Time.SCHEMA); - assertEquals(231, projected); - - projected = SchemaProjector.project(Timestamp.SCHEMA, 34567L, Timestamp.SCHEMA); - assertEquals(34567L, projected); - - Schema namedSchema = SchemaBuilder.int32().name("invalidLogicalTypeName").build(); - for (Schema logicalTypeSchema: logicalTypeSchemas) { - try { - SchemaProjector.project(logicalTypeSchema, null, Schema.BOOLEAN_SCHEMA); - fail("Cannot project logical types to non-logical types."); - } catch (SchemaProjectorException e) { - // expected - } - - try { - SchemaProjector.project(logicalTypeSchema, null, namedSchema); - fail("Reader name is not a valid logical type name."); - } catch (SchemaProjectorException e) { - // expected - } - - try { - SchemaProjector.project(Schema.BOOLEAN_SCHEMA, null, logicalTypeSchema); - fail("Cannot project non-logical types to logical types."); - } catch (SchemaProjectorException e) { - // expected - } - } - } - - @Test - public void testArrayProjection() throws Exception { - Schema source = SchemaBuilder.array(Schema.INT32_SCHEMA).build(); - - Object projected = SchemaProjector.project(source, Arrays.asList(1, 2, 3), source); - assertEquals(Arrays.asList(1, 2, 3), (List<Integer>) projected); - - Schema optionalSource = SchemaBuilder.array(Schema.INT32_SCHEMA).optional().build(); - Schema target = SchemaBuilder.array(Schema.INT32_SCHEMA).defaultValue(Arrays.asList(1, 2, 3)).build(); - projected = SchemaProjector.project(optionalSource, Arrays.asList(4, 5), target); - assertEquals(Arrays.asList(4, 5), (List<Integer>) projected); - projected = SchemaProjector.project(optionalSource, null, target); - assertEquals(Arrays.asList(1, 2, 3), (List<Integer>) projected); - - Schema promotedTarget = SchemaBuilder.array(Schema.INT64_SCHEMA).defaultValue(Arrays.asList(1L, 2L, 3L)).build(); - projected = SchemaProjector.project(optionalSource, Arrays.asList(4, 5), promotedTarget); - List<Long> expectedProjected = Arrays.asList(4L, 5L); - assertEquals(expectedProjected, (List<Long>) projected); - projected = SchemaProjector.project(optionalSource, null, promotedTarget); - assertEquals(Arrays.asList(1L, 2L, 3L), (List<Long>) projected); - - Schema noDefaultValueTarget = SchemaBuilder.array(Schema.INT32_SCHEMA).build(); - try { - SchemaProjector.project(optionalSource, null, noDefaultValueTarget); - fail("Target schema does not provide a default value."); - } catch (SchemaProjectorException e) { - // expected - } - - Schema nonPromotableTarget = SchemaBuilder.array(Schema.BOOLEAN_SCHEMA).build(); - try { - SchemaProjector.project(optionalSource, null, nonPromotableTarget); - fail("Neither source type matches target type nor source type can be promoted to target type"); - } catch (SchemaProjectorException e) { - // expected - } - } - - @Test - public void testMapProjection() throws Exception { - Schema source = SchemaBuilder.map(Schema.INT32_SCHEMA, Schema.INT32_SCHEMA).optional().build(); - - Schema target = SchemaBuilder.map(Schema.INT32_SCHEMA, Schema.INT32_SCHEMA).defaultValue(Collections.singletonMap(1, 2)).build(); - Object projected = SchemaProjector.project(source, Collections.singletonMap(3, 4), target); - assertEquals(Collections.singletonMap(3, 4), (Map<Integer, Integer>) projected); - projected = SchemaProjector.project(source, null, target); - assertEquals(Collections.singletonMap(1, 2), (Map<Integer, Integer>) projected); - - Schema promotedTarget = SchemaBuilder.map(Schema.INT64_SCHEMA, Schema.FLOAT32_SCHEMA).defaultValue( - Collections.singletonMap(3L, 4.5F)).build(); - projected = SchemaProjector.project(source, Collections.singletonMap(3, 4), promotedTarget); - assertEquals(Collections.singletonMap(3L, 4.F), (Map<Long, Float>) projected); - projected = SchemaProjector.project(source, null, promotedTarget); - assertEquals(Collections.singletonMap(3L, 4.5F), (Map<Long, Float>) projected); - - Schema noDefaultValueTarget = SchemaBuilder.map(Schema.INT32_SCHEMA, Schema.INT32_SCHEMA).build(); - try { - SchemaProjector.project(source, null, noDefaultValueTarget); - fail("Reader does not provide a default value."); - } catch (SchemaProjectorException e) { - // expected - } - - Schema nonPromotableTarget = SchemaBuilder.map(Schema.BOOLEAN_SCHEMA, Schema.STRING_SCHEMA).build(); - try { - SchemaProjector.project(source, null, nonPromotableTarget); - fail("Neither source type matches target type nor source type can be promoted to target type"); - } catch (SchemaProjectorException e) { - // expected - } - } - - @Test - public void testMaybeCompatible() throws Exception { - Schema source = SchemaBuilder.int32().name("source").build(); - Schema target = SchemaBuilder.int32().name("target").build(); - - try { - SchemaProjector.project(source, 12, target); - fail("Source name and target name mismatch."); - } catch (SchemaProjectorException e) { - // expected - } - - Schema targetWithParameters = SchemaBuilder.int32().parameters(Collections.singletonMap("key", "value")); - try { - SchemaProjector.project(source, 34, targetWithParameters); - fail("Source parameters and target parameters mismatch."); - } catch (SchemaProjectorException e) { - // expected - } - } - - private void verifyOptionalProjection(Schema source, Type targetType, Object value, Object defaultValue, Object expectedProjected, boolean optional) { - Schema target; - assert source.isOptional(); - assert value != null; - if (optional) { - target = SchemaBuilder.type(targetType).optional().defaultValue(defaultValue).build(); - } else { - target = SchemaBuilder.type(targetType).defaultValue(defaultValue).build(); - } - Object projected = SchemaProjector.project(source, value, target); - if (targetType == Type.FLOAT64) { - assertEquals((double) expectedProjected, (double) projected, 1e-6); - } else { - assertEquals(expectedProjected, projected); - } - - projected = SchemaProjector.project(source, null, target); - if (optional) { - assertEquals(null, projected); - } else { - assertEquals(defaultValue, projected); - } - } -}
http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/api/src/test/java/org/apache/kafka/copycat/data/StructTest.java ---------------------------------------------------------------------- diff --git a/copycat/api/src/test/java/org/apache/kafka/copycat/data/StructTest.java b/copycat/api/src/test/java/org/apache/kafka/copycat/data/StructTest.java deleted file mode 100644 index 162396b..0000000 --- a/copycat/api/src/test/java/org/apache/kafka/copycat/data/StructTest.java +++ /dev/null @@ -1,222 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -package org.apache.kafka.copycat.data; - -import org.apache.kafka.copycat.errors.DataException; -import org.junit.Test; - -import java.nio.ByteBuffer; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.Map; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotEquals; - -public class StructTest { - - private static final Schema FLAT_STRUCT_SCHEMA = SchemaBuilder.struct() - .field("int8", Schema.INT8_SCHEMA) - .field("int16", Schema.INT16_SCHEMA) - .field("int32", Schema.INT32_SCHEMA) - .field("int64", Schema.INT64_SCHEMA) - .field("float32", Schema.FLOAT32_SCHEMA) - .field("float64", Schema.FLOAT64_SCHEMA) - .field("boolean", Schema.BOOLEAN_SCHEMA) - .field("string", Schema.STRING_SCHEMA) - .field("bytes", Schema.BYTES_SCHEMA) - .build(); - - private static final Schema ARRAY_SCHEMA = SchemaBuilder.array(Schema.INT8_SCHEMA).build(); - private static final Schema MAP_SCHEMA = SchemaBuilder.map( - Schema.INT32_SCHEMA, - Schema.STRING_SCHEMA - ).build(); - private static final Schema NESTED_CHILD_SCHEMA = SchemaBuilder.struct() - .field("int8", Schema.INT8_SCHEMA) - .build(); - private static final Schema NESTED_SCHEMA = SchemaBuilder.struct() - .field("array", ARRAY_SCHEMA) - .field("map", MAP_SCHEMA) - .field("nested", NESTED_CHILD_SCHEMA) - .build(); - - private static final Schema REQUIRED_FIELD_SCHEMA = Schema.INT8_SCHEMA; - private static final Schema OPTIONAL_FIELD_SCHEMA = SchemaBuilder.int8().optional().build(); - private static final Schema DEFAULT_FIELD_SCHEMA = SchemaBuilder.int8().defaultValue((byte) 0).build(); - - @Test - public void testFlatStruct() { - Struct struct = new Struct(FLAT_STRUCT_SCHEMA) - .put("int8", (byte) 12) - .put("int16", (short) 12) - .put("int32", 12) - .put("int64", (long) 12) - .put("float32", 12.f) - .put("float64", 12.) - .put("boolean", true) - .put("string", "foobar") - .put("bytes", "foobar".getBytes()); - - // Test equality, and also the type-specific getters - assertEquals((byte) 12, (byte) struct.getInt8("int8")); - assertEquals((short) 12, (short) struct.getInt16("int16")); - assertEquals(12, (int) struct.getInt32("int32")); - assertEquals((long) 12, (long) struct.getInt64("int64")); - assertEquals((Float) 12.f, struct.getFloat32("float32")); - assertEquals((Double) 12., struct.getFloat64("float64")); - assertEquals(true, struct.getBoolean("boolean")); - assertEquals("foobar", struct.getString("string")); - assertEquals(ByteBuffer.wrap("foobar".getBytes()), ByteBuffer.wrap(struct.getBytes("bytes"))); - - struct.validate(); - } - - @Test - public void testComplexStruct() { - List<Byte> array = Arrays.asList((byte) 1, (byte) 2); - Map<Integer, String> map = Collections.singletonMap(1, "string"); - Struct struct = new Struct(NESTED_SCHEMA) - .put("array", array) - .put("map", map) - .put("nested", new Struct(NESTED_CHILD_SCHEMA).put("int8", (byte) 12)); - - // Separate the call to get the array and map to validate the typed get methods work properly - List<Byte> arrayExtracted = struct.getArray("array"); - assertEquals(array, arrayExtracted); - Map<Byte, Byte> mapExtracted = struct.getMap("map"); - assertEquals(map, mapExtracted); - assertEquals((byte) 12, struct.getStruct("nested").get("int8")); - - struct.validate(); - } - - - // These don't test all the ways validation can fail, just one for each element. See more extensive validation - // tests in SchemaTest. These are meant to ensure that we are invoking the same code path and that we do deeper - // inspection than just checking the class of the object - - @Test(expected = DataException.class) - public void testInvalidFieldType() { - new Struct(FLAT_STRUCT_SCHEMA).put("int8", "should fail because this is a string, not int8"); - } - - @Test(expected = DataException.class) - public void testInvalidArrayFieldElements() { - new Struct(NESTED_SCHEMA).put("array", Arrays.asList("should fail since elements should be int8s")); - } - - @Test(expected = DataException.class) - public void testInvalidMapKeyElements() { - new Struct(NESTED_SCHEMA).put("map", Collections.singletonMap("should fail because keys should be int8s", (byte) 12)); - } - - @Test(expected = DataException.class) - public void testInvalidStructFieldSchema() { - new Struct(NESTED_SCHEMA).put("nested", new Struct(MAP_SCHEMA)); - } - - @Test(expected = DataException.class) - public void testInvalidStructFieldValue() { - new Struct(NESTED_SCHEMA).put("nested", new Struct(NESTED_CHILD_SCHEMA)); - } - - - @Test(expected = DataException.class) - public void testMissingFieldValidation() { - // Required int8 field - Schema schema = SchemaBuilder.struct().field("field", REQUIRED_FIELD_SCHEMA).build(); - Struct struct = new Struct(schema); - struct.validate(); - } - - @Test - public void testMissingOptionalFieldValidation() { - Schema schema = SchemaBuilder.struct().field("field", OPTIONAL_FIELD_SCHEMA).build(); - Struct struct = new Struct(schema); - struct.validate(); - } - - @Test - public void testMissingFieldWithDefaultValidation() { - Schema schema = SchemaBuilder.struct().field("field", DEFAULT_FIELD_SCHEMA).build(); - Struct struct = new Struct(schema); - struct.validate(); - } - - - @Test - public void testEquals() { - Struct struct1 = new Struct(FLAT_STRUCT_SCHEMA) - .put("int8", (byte) 12) - .put("int16", (short) 12) - .put("int32", 12) - .put("int64", (long) 12) - .put("float32", 12.f) - .put("float64", 12.) - .put("boolean", true) - .put("string", "foobar") - .put("bytes", ByteBuffer.wrap("foobar".getBytes())); - Struct struct2 = new Struct(FLAT_STRUCT_SCHEMA) - .put("int8", (byte) 12) - .put("int16", (short) 12) - .put("int32", 12) - .put("int64", (long) 12) - .put("float32", 12.f) - .put("float64", 12.) - .put("boolean", true) - .put("string", "foobar") - .put("bytes", ByteBuffer.wrap("foobar".getBytes())); - Struct struct3 = new Struct(FLAT_STRUCT_SCHEMA) - .put("int8", (byte) 12) - .put("int16", (short) 12) - .put("int32", 12) - .put("int64", (long) 12) - .put("float32", 12.f) - .put("float64", 12.) - .put("boolean", true) - .put("string", "mismatching string") - .put("bytes", ByteBuffer.wrap("foobar".getBytes())); - - assertEquals(struct1, struct2); - assertNotEquals(struct1, struct3); - - List<Byte> array = Arrays.asList((byte) 1, (byte) 2); - Map<Integer, String> map = Collections.singletonMap(1, "string"); - struct1 = new Struct(NESTED_SCHEMA) - .put("array", array) - .put("map", map) - .put("nested", new Struct(NESTED_CHILD_SCHEMA).put("int8", (byte) 12)); - List<Byte> array2 = Arrays.asList((byte) 1, (byte) 2); - Map<Integer, String> map2 = Collections.singletonMap(1, "string"); - struct2 = new Struct(NESTED_SCHEMA) - .put("array", array2) - .put("map", map2) - .put("nested", new Struct(NESTED_CHILD_SCHEMA).put("int8", (byte) 12)); - List<Byte> array3 = Arrays.asList((byte) 1, (byte) 2, (byte) 3); - Map<Integer, String> map3 = Collections.singletonMap(2, "string"); - struct3 = new Struct(NESTED_SCHEMA) - .put("array", array3) - .put("map", map3) - .put("nested", new Struct(NESTED_CHILD_SCHEMA).put("int8", (byte) 13)); - - assertEquals(struct1, struct2); - assertNotEquals(struct1, struct3); - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/api/src/test/java/org/apache/kafka/copycat/data/TimeTest.java ---------------------------------------------------------------------- diff --git a/copycat/api/src/test/java/org/apache/kafka/copycat/data/TimeTest.java b/copycat/api/src/test/java/org/apache/kafka/copycat/data/TimeTest.java deleted file mode 100644 index 8e54cb2..0000000 --- a/copycat/api/src/test/java/org/apache/kafka/copycat/data/TimeTest.java +++ /dev/null @@ -1,80 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -package org.apache.kafka.copycat.data; - -import org.apache.kafka.copycat.errors.DataException; -import org.junit.Test; - -import java.util.Calendar; -import java.util.GregorianCalendar; -import java.util.TimeZone; - -import static org.junit.Assert.assertEquals; - -public class TimeTest { - private static final GregorianCalendar EPOCH; - private static final GregorianCalendar EPOCH_PLUS_DATE_COMPONENT; - private static final GregorianCalendar EPOCH_PLUS_TEN_THOUSAND_MILLIS; - static { - EPOCH = new GregorianCalendar(1970, Calendar.JANUARY, 1, 0, 0, 0); - EPOCH.setTimeZone(TimeZone.getTimeZone("UTC")); - - EPOCH_PLUS_TEN_THOUSAND_MILLIS = new GregorianCalendar(1970, Calendar.JANUARY, 1, 0, 0, 0); - EPOCH_PLUS_TEN_THOUSAND_MILLIS.setTimeZone(TimeZone.getTimeZone("UTC")); - EPOCH_PLUS_TEN_THOUSAND_MILLIS.add(Calendar.MILLISECOND, 10000); - - - EPOCH_PLUS_DATE_COMPONENT = new GregorianCalendar(1970, Calendar.JANUARY, 1, 0, 0, 0); - EPOCH_PLUS_DATE_COMPONENT.setTimeZone(TimeZone.getTimeZone("UTC")); - EPOCH_PLUS_DATE_COMPONENT.add(Calendar.DATE, 10000); - } - - @Test - public void testBuilder() { - Schema plain = Time.SCHEMA; - assertEquals(Time.LOGICAL_NAME, plain.name()); - assertEquals(1, (Object) plain.version()); - } - - @Test - public void testFromLogical() { - assertEquals(0, Time.fromLogical(Time.SCHEMA, EPOCH.getTime())); - assertEquals(10000, Time.fromLogical(Time.SCHEMA, EPOCH_PLUS_TEN_THOUSAND_MILLIS.getTime())); - } - - @Test(expected = DataException.class) - public void testFromLogicalInvalidSchema() { - Time.fromLogical(Time.builder().name("invalid").build(), EPOCH.getTime()); - } - - @Test(expected = DataException.class) - public void testFromLogicalInvalidHasDateComponents() { - Time.fromLogical(Time.SCHEMA, EPOCH_PLUS_DATE_COMPONENT.getTime()); - } - - @Test - public void testToLogical() { - assertEquals(EPOCH.getTime(), Time.toLogical(Time.SCHEMA, 0)); - assertEquals(EPOCH_PLUS_TEN_THOUSAND_MILLIS.getTime(), Time.toLogical(Time.SCHEMA, 10000)); - } - - @Test(expected = DataException.class) - public void testToLogicalInvalidSchema() { - Time.toLogical(Time.builder().name("invalid").build(), 0); - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/api/src/test/java/org/apache/kafka/copycat/data/TimestampTest.java ---------------------------------------------------------------------- diff --git a/copycat/api/src/test/java/org/apache/kafka/copycat/data/TimestampTest.java b/copycat/api/src/test/java/org/apache/kafka/copycat/data/TimestampTest.java deleted file mode 100644 index cb5454c..0000000 --- a/copycat/api/src/test/java/org/apache/kafka/copycat/data/TimestampTest.java +++ /dev/null @@ -1,75 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -package org.apache.kafka.copycat.data; - -import org.apache.kafka.copycat.errors.DataException; -import org.junit.Test; - -import java.util.Calendar; -import java.util.GregorianCalendar; -import java.util.TimeZone; - -import static org.junit.Assert.assertEquals; - -public class TimestampTest { - private static final GregorianCalendar EPOCH; - private static final GregorianCalendar EPOCH_PLUS_MILLIS; - - private static final int NUM_MILLIS = 2000000000; - private static final long TOTAL_MILLIS = ((long) NUM_MILLIS) * 2; - - static { - EPOCH = new GregorianCalendar(1970, Calendar.JANUARY, 1, 0, 0, 0); - EPOCH.setTimeZone(TimeZone.getTimeZone("UTC")); - - - EPOCH_PLUS_MILLIS = new GregorianCalendar(1970, Calendar.JANUARY, 1, 0, 0, 0); - EPOCH_PLUS_MILLIS.setTimeZone(TimeZone.getTimeZone("UTC")); - EPOCH_PLUS_MILLIS.add(Calendar.MILLISECOND, NUM_MILLIS); - EPOCH_PLUS_MILLIS.add(Calendar.MILLISECOND, NUM_MILLIS); - } - - @Test - public void testBuilder() { - Schema plain = Date.SCHEMA; - assertEquals(Date.LOGICAL_NAME, plain.name()); - assertEquals(1, (Object) plain.version()); - } - - @Test - public void testFromLogical() { - assertEquals(0L, Timestamp.fromLogical(Timestamp.SCHEMA, EPOCH.getTime())); - assertEquals(TOTAL_MILLIS, Timestamp.fromLogical(Timestamp.SCHEMA, EPOCH_PLUS_MILLIS.getTime())); - } - - @Test(expected = DataException.class) - public void testFromLogicalInvalidSchema() { - Timestamp.fromLogical(Timestamp.builder().name("invalid").build(), EPOCH.getTime()); - } - - @Test - public void testToLogical() { - assertEquals(EPOCH.getTime(), Timestamp.toLogical(Timestamp.SCHEMA, 0L)); - assertEquals(EPOCH_PLUS_MILLIS.getTime(), Timestamp.toLogical(Timestamp.SCHEMA, TOTAL_MILLIS)); - } - - @Test(expected = DataException.class) - public void testToLogicalInvalidSchema() { - Date.toLogical(Date.builder().name("invalid").build(), 0); - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/api/src/test/java/org/apache/kafka/copycat/storage/StringConverterTest.java ---------------------------------------------------------------------- diff --git a/copycat/api/src/test/java/org/apache/kafka/copycat/storage/StringConverterTest.java b/copycat/api/src/test/java/org/apache/kafka/copycat/storage/StringConverterTest.java deleted file mode 100644 index 3ea69c1..0000000 --- a/copycat/api/src/test/java/org/apache/kafka/copycat/storage/StringConverterTest.java +++ /dev/null @@ -1,83 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -package org.apache.kafka.copycat.storage; - -import org.apache.kafka.copycat.data.Schema; -import org.apache.kafka.copycat.data.SchemaAndValue; -import org.junit.Test; - -import java.io.UnsupportedEncodingException; -import java.util.Collections; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertArrayEquals; - -public class StringConverterTest { - private static final String TOPIC = "topic"; - private static final String SAMPLE_STRING = "a string"; - - private StringConverter converter = new StringConverter(); - - @Test - public void testStringToBytes() throws UnsupportedEncodingException { - assertArrayEquals(SAMPLE_STRING.getBytes("UTF8"), converter.fromCopycatData(TOPIC, Schema.STRING_SCHEMA, SAMPLE_STRING)); - } - - @Test - public void testNonStringToBytes() throws UnsupportedEncodingException { - assertArrayEquals("true".getBytes("UTF8"), converter.fromCopycatData(TOPIC, Schema.BOOLEAN_SCHEMA, true)); - } - - @Test - public void testNullToBytes() { - assertEquals(null, converter.fromCopycatData(TOPIC, Schema.OPTIONAL_STRING_SCHEMA, null)); - } - - @Test - public void testToBytesIgnoresSchema() throws UnsupportedEncodingException { - assertArrayEquals("true".getBytes("UTF8"), converter.fromCopycatData(TOPIC, null, true)); - } - - @Test - public void testToBytesNonUtf8Encoding() throws UnsupportedEncodingException { - converter.configure(Collections.singletonMap("converter.encoding", "UTF-16"), true); - assertArrayEquals(SAMPLE_STRING.getBytes("UTF-16"), converter.fromCopycatData(TOPIC, Schema.STRING_SCHEMA, SAMPLE_STRING)); - } - - @Test - public void testBytesToString() { - SchemaAndValue data = converter.toCopycatData(TOPIC, SAMPLE_STRING.getBytes()); - assertEquals(Schema.OPTIONAL_STRING_SCHEMA, data.schema()); - assertEquals(SAMPLE_STRING, data.value()); - } - - @Test - public void testBytesNullToString() { - SchemaAndValue data = converter.toCopycatData(TOPIC, null); - assertEquals(Schema.OPTIONAL_STRING_SCHEMA, data.schema()); - assertEquals(null, data.value()); - } - - @Test - public void testBytesToStringNonUtf8Encoding() throws UnsupportedEncodingException { - converter.configure(Collections.singletonMap("converter.encoding", "UTF-16"), true); - SchemaAndValue data = converter.toCopycatData(TOPIC, SAMPLE_STRING.getBytes("UTF-16")); - assertEquals(Schema.OPTIONAL_STRING_SCHEMA, data.schema()); - assertEquals(SAMPLE_STRING, data.value()); - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/api/src/test/java/org/apache/kafka/copycat/util/ConnectorUtilsTest.java ---------------------------------------------------------------------- diff --git a/copycat/api/src/test/java/org/apache/kafka/copycat/util/ConnectorUtilsTest.java b/copycat/api/src/test/java/org/apache/kafka/copycat/util/ConnectorUtilsTest.java deleted file mode 100644 index e46967b..0000000 --- a/copycat/api/src/test/java/org/apache/kafka/copycat/util/ConnectorUtilsTest.java +++ /dev/null @@ -1,67 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -package org.apache.kafka.copycat.util; - -import org.junit.Test; - -import java.util.Arrays; -import java.util.Collections; -import java.util.List; - -import static org.junit.Assert.assertEquals; - -public class ConnectorUtilsTest { - - private static final List<Integer> FIVE_ELEMENTS = Arrays.asList(1, 2, 3, 4, 5); - - @Test - public void testGroupPartitions() { - - List<List<Integer>> grouped = ConnectorUtils.groupPartitions(FIVE_ELEMENTS, 1); - assertEquals(Arrays.asList(FIVE_ELEMENTS), grouped); - - grouped = ConnectorUtils.groupPartitions(FIVE_ELEMENTS, 2); - assertEquals(Arrays.asList(Arrays.asList(1, 2, 3), Arrays.asList(4, 5)), grouped); - - grouped = ConnectorUtils.groupPartitions(FIVE_ELEMENTS, 3); - assertEquals(Arrays.asList(Arrays.asList(1, 2), - Arrays.asList(3, 4), - Arrays.asList(5)), grouped); - - grouped = ConnectorUtils.groupPartitions(FIVE_ELEMENTS, 5); - assertEquals(Arrays.asList(Arrays.asList(1), - Arrays.asList(2), - Arrays.asList(3), - Arrays.asList(4), - Arrays.asList(5)), grouped); - - grouped = ConnectorUtils.groupPartitions(FIVE_ELEMENTS, 7); - assertEquals(Arrays.asList(Arrays.asList(1), - Arrays.asList(2), - Arrays.asList(3), - Arrays.asList(4), - Arrays.asList(5), - Collections.EMPTY_LIST, - Collections.EMPTY_LIST), grouped); - } - - @Test(expected = IllegalArgumentException.class) - public void testGroupPartitionsInvalidCount() { - ConnectorUtils.groupPartitions(FIVE_ELEMENTS, 0); - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkConnector.java ---------------------------------------------------------------------- diff --git a/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkConnector.java b/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkConnector.java deleted file mode 100644 index d0d59a8..0000000 --- a/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkConnector.java +++ /dev/null @@ -1,69 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -package org.apache.kafka.copycat.file; - -import org.apache.kafka.common.utils.AppInfoParser; -import org.apache.kafka.copycat.connector.Task; -import org.apache.kafka.copycat.sink.SinkConnector; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -/** - * Very simple connector that works with the console. This connector supports both source and - * sink modes via its 'mode' setting. - */ -public class FileStreamSinkConnector extends SinkConnector { - public static final String FILE_CONFIG = "file"; - - private String filename; - - @Override - public String version() { - return AppInfoParser.getVersion(); - } - - @Override - public void start(Map<String, String> props) { - filename = props.get(FILE_CONFIG); - } - - @Override - public Class<? extends Task> taskClass() { - return FileStreamSinkTask.class; - } - - @Override - public List<Map<String, String>> taskConfigs(int maxTasks) { - ArrayList<Map<String, String>> configs = new ArrayList<>(); - for (int i = 0; i < maxTasks; i++) { - Map<String, String> config = new HashMap<>(); - if (filename != null) - config.put(FILE_CONFIG, filename); - configs.add(config); - } - return configs; - } - - @Override - public void stop() { - // Nothing to do since FileStreamSinkConnector has no background monitoring. - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkTask.java ---------------------------------------------------------------------- diff --git a/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkTask.java b/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkTask.java deleted file mode 100644 index f95ef8e..0000000 --- a/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkTask.java +++ /dev/null @@ -1,94 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -package org.apache.kafka.copycat.file; - -import org.apache.kafka.clients.consumer.OffsetAndMetadata; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.copycat.errors.CopycatException; -import org.apache.kafka.copycat.sink.SinkRecord; -import org.apache.kafka.copycat.sink.SinkTask; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.FileNotFoundException; -import java.io.FileOutputStream; -import java.io.PrintStream; -import java.util.Collection; -import java.util.Map; - -/** - * FileStreamSinkTask writes records to stdout or a file. - */ -public class FileStreamSinkTask extends SinkTask { - private static final Logger log = LoggerFactory.getLogger(FileStreamSinkTask.class); - - private String filename; - private PrintStream outputStream; - - public FileStreamSinkTask() { - } - - // for testing - public FileStreamSinkTask(PrintStream outputStream) { - filename = null; - this.outputStream = outputStream; - } - - @Override - public String version() { - return new FileStreamSinkConnector().version(); - } - - @Override - public void start(Map<String, String> props) { - filename = props.get(FileStreamSinkConnector.FILE_CONFIG); - if (filename == null) { - outputStream = System.out; - } else { - try { - outputStream = new PrintStream(new FileOutputStream(filename, true)); - } catch (FileNotFoundException e) { - throw new CopycatException("Couldn't find or create file for FileStreamSinkTask", e); - } - } - } - - @Override - public void put(Collection<SinkRecord> sinkRecords) { - for (SinkRecord record : sinkRecords) { - log.trace("Writing line to {}: {}", logFilename(), record.value()); - outputStream.println(record.value()); - } - } - - @Override - public void flush(Map<TopicPartition, OffsetAndMetadata> offsets) { - log.trace("Flushing output stream for {}", logFilename()); - outputStream.flush(); - } - - @Override - public void stop() { - if (outputStream != System.out) - outputStream.close(); - } - - private String logFilename() { - return filename == null ? "stdout" : filename; - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceConnector.java ---------------------------------------------------------------------- diff --git a/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceConnector.java b/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceConnector.java deleted file mode 100644 index 9021775..0000000 --- a/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceConnector.java +++ /dev/null @@ -1,77 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -package org.apache.kafka.copycat.file; - -import org.apache.kafka.common.utils.AppInfoParser; -import org.apache.kafka.copycat.connector.Task; -import org.apache.kafka.copycat.errors.CopycatException; -import org.apache.kafka.copycat.source.SourceConnector; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -/** - * Very simple connector that works with the console. This connector supports both source and - * sink modes via its 'mode' setting. - */ -public class FileStreamSourceConnector extends SourceConnector { - public static final String TOPIC_CONFIG = "topic"; - public static final String FILE_CONFIG = "file"; - - private String filename; - private String topic; - - @Override - public String version() { - return AppInfoParser.getVersion(); - } - - @Override - public void start(Map<String, String> props) { - filename = props.get(FILE_CONFIG); - topic = props.get(TOPIC_CONFIG); - if (topic == null || topic.isEmpty()) - throw new CopycatException("FileStreamSourceConnector configuration must include 'topic' setting"); - if (topic.contains(",")) - throw new CopycatException("FileStreamSourceConnector should only have a single topic when used as a source."); - } - - @Override - public Class<? extends Task> taskClass() { - return FileStreamSourceTask.class; - } - - @Override - public List<Map<String, String>> taskConfigs(int maxTasks) { - ArrayList<Map<String, String>> configs = new ArrayList<>(); - // Only one input stream makes sense. - Map<String, String> config = new HashMap<>(); - if (filename != null) - config.put(FILE_CONFIG, filename); - config.put(TOPIC_CONFIG, topic); - configs.add(config); - return configs; - } - - @Override - public void stop() { - // Nothing to do since FileStreamSourceConnector has no background monitoring. - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceTask.java ---------------------------------------------------------------------- diff --git a/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceTask.java b/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceTask.java deleted file mode 100644 index 2a2cfbc..0000000 --- a/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceTask.java +++ /dev/null @@ -1,216 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -package org.apache.kafka.copycat.file; - -import org.apache.kafka.copycat.data.Schema; -import org.apache.kafka.copycat.errors.CopycatException; -import org.apache.kafka.copycat.source.SourceRecord; -import org.apache.kafka.copycat.source.SourceTask; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.*; -import java.util.*; - -/** - * FileStreamSourceTask reads from stdin or a file. - */ -public class FileStreamSourceTask extends SourceTask { - private static final Logger log = LoggerFactory.getLogger(FileStreamSourceTask.class); - public static final String FILENAME_FIELD = "filename"; - public static final String POSITION_FIELD = "position"; - private static final Schema VALUE_SCHEMA = Schema.STRING_SCHEMA; - - private String filename; - private InputStream stream; - private BufferedReader reader = null; - private char[] buffer = new char[1024]; - private int offset = 0; - private String topic = null; - - private Long streamOffset; - - @Override - public String version() { - return new FileStreamSourceConnector().version(); - } - - @Override - public void start(Map<String, String> props) { - filename = props.get(FileStreamSourceConnector.FILE_CONFIG); - if (filename == null || filename.isEmpty()) { - stream = System.in; - // Tracking offset for stdin doesn't make sense - streamOffset = null; - reader = new BufferedReader(new InputStreamReader(stream)); - } - topic = props.get(FileStreamSourceConnector.TOPIC_CONFIG); - if (topic == null) - throw new CopycatException("FileStreamSourceTask config missing topic setting"); - } - - @Override - public List<SourceRecord> poll() throws InterruptedException { - if (stream == null) { - try { - stream = new FileInputStream(filename); - Map<String, Object> offset = context.offsetStorageReader().offset(Collections.singletonMap(FILENAME_FIELD, filename)); - if (offset != null) { - Object lastRecordedOffset = offset.get(POSITION_FIELD); - if (lastRecordedOffset != null && !(lastRecordedOffset instanceof Long)) - throw new CopycatException("Offset position is the incorrect type"); - if (lastRecordedOffset != null) { - log.debug("Found previous offset, trying to skip to file offset {}", lastRecordedOffset); - long skipLeft = (Long) lastRecordedOffset; - while (skipLeft > 0) { - try { - long skipped = stream.skip(skipLeft); - skipLeft -= skipped; - } catch (IOException e) { - log.error("Error while trying to seek to previous offset in file: ", e); - throw new CopycatException(e); - } - } - log.debug("Skipped to offset {}", lastRecordedOffset); - } - streamOffset = (lastRecordedOffset != null) ? (Long) lastRecordedOffset : 0L; - } else { - streamOffset = 0L; - } - reader = new BufferedReader(new InputStreamReader(stream)); - log.debug("Opened {} for reading", logFilename()); - } catch (FileNotFoundException e) { - log.warn("Couldn't find file for FileStreamSourceTask, sleeping to wait for it to be created"); - synchronized (this) { - this.wait(1000); - } - return null; - } - } - - // Unfortunately we can't just use readLine() because it blocks in an uninterruptible way. - // Instead we have to manage splitting lines ourselves, using simple backoff when no new data - // is available. - try { - final BufferedReader readerCopy; - synchronized (this) { - readerCopy = reader; - } - if (readerCopy == null) - return null; - - ArrayList<SourceRecord> records = null; - - int nread = 0; - while (readerCopy.ready()) { - nread = readerCopy.read(buffer, offset, buffer.length - offset); - log.trace("Read {} bytes from {}", nread, logFilename()); - - if (nread > 0) { - offset += nread; - if (offset == buffer.length) { - char[] newbuf = new char[buffer.length * 2]; - System.arraycopy(buffer, 0, newbuf, 0, buffer.length); - buffer = newbuf; - } - - String line; - do { - line = extractLine(); - if (line != null) { - log.trace("Read a line from {}", logFilename()); - if (records == null) - records = new ArrayList<>(); - records.add(new SourceRecord(offsetKey(filename), offsetValue(streamOffset), topic, VALUE_SCHEMA, line)); - } - new ArrayList<SourceRecord>(); - } while (line != null); - } - } - - if (nread <= 0) - synchronized (this) { - this.wait(1000); - } - - return records; - } catch (IOException e) { - // Underlying stream was killed, probably as a result of calling stop. Allow to return - // null, and driving thread will handle any shutdown if necessary. - } - return null; - } - - private String extractLine() { - int until = -1, newStart = -1; - for (int i = 0; i < offset; i++) { - if (buffer[i] == '\n') { - until = i; - newStart = i + 1; - break; - } else if (buffer[i] == '\r') { - // We need to check for \r\n, so we must skip this if we can't check the next char - if (i + 1 >= offset) - return null; - - until = i; - newStart = (buffer[i + 1] == '\n') ? i + 2 : i + 1; - break; - } - } - - if (until != -1) { - String result = new String(buffer, 0, until); - System.arraycopy(buffer, newStart, buffer, 0, buffer.length - newStart); - offset = offset - newStart; - if (streamOffset != null) - streamOffset += newStart; - return result; - } else { - return null; - } - } - - @Override - public void stop() { - log.trace("Stopping"); - synchronized (this) { - try { - if (stream != null && stream != System.in) { - stream.close(); - log.trace("Closed input stream"); - } - } catch (IOException e) { - log.error("Failed to close FileStreamSourceTask stream: ", e); - } - this.notify(); - } - } - - private Map<String, String> offsetKey(String filename) { - return Collections.singletonMap(FILENAME_FIELD, filename); - } - - private Map<String, Long> offsetValue(Long pos) { - return Collections.singletonMap(POSITION_FIELD, pos); - } - - private String logFilename() { - return filename == null ? "stdin" : filename; - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSinkConnectorTest.java ---------------------------------------------------------------------- diff --git a/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSinkConnectorTest.java b/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSinkConnectorTest.java deleted file mode 100644 index b30856f..0000000 --- a/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSinkConnectorTest.java +++ /dev/null @@ -1,86 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -package org.apache.kafka.copycat.file; - -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.copycat.connector.ConnectorContext; -import org.apache.kafka.copycat.sink.SinkConnector; -import org.junit.Before; -import org.junit.Test; -import org.powermock.api.easymock.PowerMock; - -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import static org.junit.Assert.assertEquals; - -public class FileStreamSinkConnectorTest { - - private static final String MULTIPLE_TOPICS = "test1,test2"; - private static final String[] MULTIPLE_TOPICS_LIST - = MULTIPLE_TOPICS.split(","); - private static final List<TopicPartition> MULTIPLE_TOPICS_PARTITIONS = Arrays.asList( - new TopicPartition("test1", 1), new TopicPartition("test2", 2) - ); - private static final String FILENAME = "/afilename"; - - private FileStreamSinkConnector connector; - private ConnectorContext ctx; - private Map<String, String> sinkProperties; - - @Before - public void setup() { - connector = new FileStreamSinkConnector(); - ctx = PowerMock.createMock(ConnectorContext.class); - connector.initialize(ctx); - - sinkProperties = new HashMap<>(); - sinkProperties.put(SinkConnector.TOPICS_CONFIG, MULTIPLE_TOPICS); - sinkProperties.put(FileStreamSinkConnector.FILE_CONFIG, FILENAME); - } - - @Test - public void testSinkTasks() { - PowerMock.replayAll(); - - connector.start(sinkProperties); - List<Map<String, String>> taskConfigs = connector.taskConfigs(1); - assertEquals(1, taskConfigs.size()); - assertEquals(FILENAME, taskConfigs.get(0).get(FileStreamSinkConnector.FILE_CONFIG)); - - taskConfigs = connector.taskConfigs(2); - assertEquals(2, taskConfigs.size()); - for (int i = 0; i < 2; i++) { - assertEquals(FILENAME, taskConfigs.get(0).get(FileStreamSinkConnector.FILE_CONFIG)); - } - - PowerMock.verifyAll(); - } - - @Test - public void testTaskClass() { - PowerMock.replayAll(); - - connector.start(sinkProperties); - assertEquals(FileStreamSinkTask.class, connector.taskClass()); - - PowerMock.verifyAll(); - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSinkTaskTest.java ---------------------------------------------------------------------- diff --git a/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSinkTaskTest.java b/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSinkTaskTest.java deleted file mode 100644 index ac8b5f1..0000000 --- a/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSinkTaskTest.java +++ /dev/null @@ -1,69 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -package org.apache.kafka.copycat.file; - -import org.apache.kafka.clients.consumer.OffsetAndMetadata; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.copycat.data.Schema; -import org.apache.kafka.copycat.sink.SinkRecord; -import org.junit.Before; -import org.junit.Test; - -import java.io.ByteArrayOutputStream; -import java.io.PrintStream; -import java.util.Arrays; -import java.util.HashMap; - -import static org.junit.Assert.assertEquals; - -public class FileStreamSinkTaskTest { - - private FileStreamSinkTask task; - private ByteArrayOutputStream os; - private PrintStream printStream; - - @Before - public void setup() { - os = new ByteArrayOutputStream(); - printStream = new PrintStream(os); - task = new FileStreamSinkTask(printStream); - } - - @Test - public void testPutFlush() { - HashMap<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(); - - // We do not call task.start() since it would override the output stream - - task.put(Arrays.asList( - new SinkRecord("topic1", 0, null, null, Schema.STRING_SCHEMA, "line1", 1) - )); - offsets.put(new TopicPartition("topic1", 0), new OffsetAndMetadata(1L)); - task.flush(offsets); - assertEquals("line1\n", os.toString()); - - task.put(Arrays.asList( - new SinkRecord("topic1", 0, null, null, Schema.STRING_SCHEMA, "line2", 2), - new SinkRecord("topic2", 0, null, null, Schema.STRING_SCHEMA, "line3", 1) - )); - offsets.put(new TopicPartition("topic1", 0), new OffsetAndMetadata(2L)); - offsets.put(new TopicPartition("topic2", 0), new OffsetAndMetadata(1L)); - task.flush(offsets); - assertEquals("line1\nline2\nline3\n", os.toString()); - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceConnectorTest.java ---------------------------------------------------------------------- diff --git a/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceConnectorTest.java b/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceConnectorTest.java deleted file mode 100644 index 28bfa62..0000000 --- a/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceConnectorTest.java +++ /dev/null @@ -1,105 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -package org.apache.kafka.copycat.file; - -import org.apache.kafka.copycat.connector.ConnectorContext; -import org.apache.kafka.copycat.errors.CopycatException; -import org.junit.Before; -import org.junit.Test; -import org.powermock.api.easymock.PowerMock; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; - -public class FileStreamSourceConnectorTest { - - private static final String SINGLE_TOPIC = "test"; - private static final String MULTIPLE_TOPICS = "test1,test2"; - private static final String FILENAME = "/somefilename"; - - private FileStreamSourceConnector connector; - private ConnectorContext ctx; - private Map<String, String> sourceProperties; - - @Before - public void setup() { - connector = new FileStreamSourceConnector(); - ctx = PowerMock.createMock(ConnectorContext.class); - connector.initialize(ctx); - - sourceProperties = new HashMap<>(); - sourceProperties.put(FileStreamSourceConnector.TOPIC_CONFIG, SINGLE_TOPIC); - sourceProperties.put(FileStreamSourceConnector.FILE_CONFIG, FILENAME); - } - - @Test - public void testSourceTasks() { - PowerMock.replayAll(); - - connector.start(sourceProperties); - List<Map<String, String>> taskConfigs = connector.taskConfigs(1); - assertEquals(1, taskConfigs.size()); - assertEquals(FILENAME, - taskConfigs.get(0).get(FileStreamSourceConnector.FILE_CONFIG)); - assertEquals(SINGLE_TOPIC, - taskConfigs.get(0).get(FileStreamSourceConnector.TOPIC_CONFIG)); - - // Should be able to return fewer than requested # - taskConfigs = connector.taskConfigs(2); - assertEquals(1, taskConfigs.size()); - assertEquals(FILENAME, - taskConfigs.get(0).get(FileStreamSourceConnector.FILE_CONFIG)); - assertEquals(SINGLE_TOPIC, - taskConfigs.get(0).get(FileStreamSourceConnector.TOPIC_CONFIG)); - - PowerMock.verifyAll(); - } - - @Test - public void testSourceTasksStdin() { - PowerMock.replayAll(); - - sourceProperties.remove(FileStreamSourceConnector.FILE_CONFIG); - connector.start(sourceProperties); - List<Map<String, String>> taskConfigs = connector.taskConfigs(1); - assertEquals(1, taskConfigs.size()); - assertNull(taskConfigs.get(0).get(FileStreamSourceConnector.FILE_CONFIG)); - - PowerMock.verifyAll(); - } - - @Test(expected = CopycatException.class) - public void testMultipleSourcesInvalid() { - sourceProperties.put(FileStreamSourceConnector.TOPIC_CONFIG, MULTIPLE_TOPICS); - connector.start(sourceProperties); - } - - @Test - public void testTaskClass() { - PowerMock.replayAll(); - - connector.start(sourceProperties); - assertEquals(FileStreamSourceTask.class, connector.taskClass()); - - PowerMock.verifyAll(); - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceTaskTest.java ---------------------------------------------------------------------- diff --git a/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceTaskTest.java b/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceTaskTest.java deleted file mode 100644 index ddf8e43..0000000 --- a/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceTaskTest.java +++ /dev/null @@ -1,150 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -package org.apache.kafka.copycat.file; - -import org.apache.kafka.copycat.errors.CopycatException; -import org.apache.kafka.copycat.source.SourceRecord; -import org.apache.kafka.copycat.source.SourceTaskContext; -import org.apache.kafka.copycat.storage.OffsetStorageReader; -import org.easymock.EasyMock; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.powermock.api.easymock.PowerMock; - -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import static org.junit.Assert.assertEquals; - -public class FileStreamSourceTaskTest { - - private static final String TOPIC = "test"; - - private File tempFile; - private Map<String, String> config; - private OffsetStorageReader offsetStorageReader; - private SourceTaskContext context; - private FileStreamSourceTask task; - - private boolean verifyMocks = false; - - @Before - public void setup() throws IOException { - tempFile = File.createTempFile("file-stream-source-task-test", null); - config = new HashMap<>(); - config.put(FileStreamSourceConnector.FILE_CONFIG, tempFile.getAbsolutePath()); - config.put(FileStreamSourceConnector.TOPIC_CONFIG, TOPIC); - task = new FileStreamSourceTask(); - offsetStorageReader = PowerMock.createMock(OffsetStorageReader.class); - context = PowerMock.createMock(SourceTaskContext.class); - task.initialize(context); - } - - @After - public void teardown() { - tempFile.delete(); - - if (verifyMocks) - PowerMock.verifyAll(); - } - - private void replay() { - PowerMock.replayAll(); - verifyMocks = true; - } - - @Test - public void testNormalLifecycle() throws InterruptedException, IOException { - expectOffsetLookupReturnNone(); - replay(); - - task.start(config); - - FileOutputStream os = new FileOutputStream(tempFile); - assertEquals(null, task.poll()); - os.write("partial line".getBytes()); - os.flush(); - assertEquals(null, task.poll()); - os.write(" finished\n".getBytes()); - os.flush(); - List<SourceRecord> records = task.poll(); - assertEquals(1, records.size()); - assertEquals(TOPIC, records.get(0).topic()); - assertEquals("partial line finished", records.get(0).value()); - assertEquals(Collections.singletonMap(FileStreamSourceTask.FILENAME_FIELD, tempFile.getAbsolutePath()), records.get(0).sourcePartition()); - assertEquals(Collections.singletonMap(FileStreamSourceTask.POSITION_FIELD, 22L), records.get(0).sourceOffset()); - assertEquals(null, task.poll()); - - // Different line endings, and make sure the final \r doesn't result in a line until we can - // read the subsequent byte. - os.write("line1\rline2\r\nline3\nline4\n\r".getBytes()); - os.flush(); - records = task.poll(); - assertEquals(4, records.size()); - assertEquals("line1", records.get(0).value()); - assertEquals(Collections.singletonMap(FileStreamSourceTask.FILENAME_FIELD, tempFile.getAbsolutePath()), records.get(0).sourcePartition()); - assertEquals(Collections.singletonMap(FileStreamSourceTask.POSITION_FIELD, 28L), records.get(0).sourceOffset()); - assertEquals("line2", records.get(1).value()); - assertEquals(Collections.singletonMap(FileStreamSourceTask.FILENAME_FIELD, tempFile.getAbsolutePath()), records.get(1).sourcePartition()); - assertEquals(Collections.singletonMap(FileStreamSourceTask.POSITION_FIELD, 35L), records.get(1).sourceOffset()); - assertEquals("line3", records.get(2).value()); - assertEquals(Collections.singletonMap(FileStreamSourceTask.FILENAME_FIELD, tempFile.getAbsolutePath()), records.get(2).sourcePartition()); - assertEquals(Collections.singletonMap(FileStreamSourceTask.POSITION_FIELD, 41L), records.get(2).sourceOffset()); - assertEquals("line4", records.get(3).value()); - assertEquals(Collections.singletonMap(FileStreamSourceTask.FILENAME_FIELD, tempFile.getAbsolutePath()), records.get(3).sourcePartition()); - assertEquals(Collections.singletonMap(FileStreamSourceTask.POSITION_FIELD, 47L), records.get(3).sourceOffset()); - - os.write("subsequent text".getBytes()); - os.flush(); - records = task.poll(); - assertEquals(1, records.size()); - assertEquals("", records.get(0).value()); - assertEquals(Collections.singletonMap(FileStreamSourceTask.FILENAME_FIELD, tempFile.getAbsolutePath()), records.get(0).sourcePartition()); - assertEquals(Collections.singletonMap(FileStreamSourceTask.POSITION_FIELD, 48L), records.get(0).sourceOffset()); - - task.stop(); - } - - @Test(expected = CopycatException.class) - public void testMissingTopic() throws InterruptedException { - replay(); - - config.remove(FileStreamSourceConnector.TOPIC_CONFIG); - task.start(config); - } - - public void testInvalidFile() throws InterruptedException { - config.put(FileStreamSourceConnector.FILE_CONFIG, "bogusfilename"); - task.start(config); - // Currently the task retries indefinitely if the file isn't found, but shouldn't return any data. - for (int i = 0; i < 100; i++) - assertEquals(null, task.poll()); - } - - - private void expectOffsetLookupReturnNone() { - EasyMock.expect(context.offsetStorageReader()).andReturn(offsetStorageReader); - EasyMock.expect(offsetStorageReader.offset(EasyMock.anyObject(Map.class))).andReturn(null); - } -} \ No newline at end of file