http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaProjectorTest.java ---------------------------------------------------------------------- diff --git a/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaProjectorTest.java b/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaProjectorTest.java new file mode 100644 index 0000000..0b1760b --- /dev/null +++ b/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaProjectorTest.java @@ -0,0 +1,495 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + **/ + +package org.apache.kafka.connect.data; + +import org.apache.kafka.connect.data.Schema.Type; +import org.apache.kafka.connect.errors.DataException; +import org.apache.kafka.connect.errors.SchemaProjectorException; +import org.junit.Test; + +import java.math.BigDecimal; +import java.math.BigInteger; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +public class SchemaProjectorTest { + + @Test + public void testPrimitiveTypeProjection() throws Exception { + Object projected; + projected = SchemaProjector.project(Schema.BOOLEAN_SCHEMA, false, Schema.BOOLEAN_SCHEMA); + assertEquals(false, projected); + + byte[] bytes = {(byte) 1, (byte) 2}; + projected = SchemaProjector.project(Schema.BYTES_SCHEMA, bytes, Schema.BYTES_SCHEMA); + assertEquals(bytes, projected); + + projected = SchemaProjector.project(Schema.STRING_SCHEMA, "abc", Schema.STRING_SCHEMA); + assertEquals("abc", projected); + + projected = SchemaProjector.project(Schema.BOOLEAN_SCHEMA, false, Schema.OPTIONAL_BOOLEAN_SCHEMA); + assertEquals(false, projected); + + projected = SchemaProjector.project(Schema.BYTES_SCHEMA, bytes, Schema.OPTIONAL_BYTES_SCHEMA); + assertEquals(bytes, projected); + + projected = SchemaProjector.project(Schema.STRING_SCHEMA, "abc", Schema.OPTIONAL_STRING_SCHEMA); + assertEquals("abc", projected); + + try { + SchemaProjector.project(Schema.OPTIONAL_BOOLEAN_SCHEMA, false, Schema.BOOLEAN_SCHEMA); + fail("Cannot project optional schema to schema with no default value."); + } catch (DataException e) { + // expected + } + + try { + SchemaProjector.project(Schema.OPTIONAL_BYTES_SCHEMA, bytes, Schema.BYTES_SCHEMA); + fail("Cannot project optional schema to schema with no default value."); + } catch (DataException e) { + // expected + } + + try { + SchemaProjector.project(Schema.OPTIONAL_STRING_SCHEMA, "abc", Schema.STRING_SCHEMA); + fail("Cannot project optional schema to schema with no default value."); + } catch (DataException e) { + // expected + } + } + + @Test + public void testNumericTypeProjection() throws Exception { + Schema[] promotableSchemas = {Schema.INT8_SCHEMA, Schema.INT16_SCHEMA, Schema.INT32_SCHEMA, Schema.INT64_SCHEMA, Schema.FLOAT32_SCHEMA, Schema.FLOAT64_SCHEMA}; + Schema[] promotableOptionalSchemas = {Schema.OPTIONAL_INT8_SCHEMA, Schema.OPTIONAL_INT16_SCHEMA, Schema.OPTIONAL_INT32_SCHEMA, Schema.OPTIONAL_INT64_SCHEMA, + Schema.OPTIONAL_FLOAT32_SCHEMA, Schema.OPTIONAL_FLOAT64_SCHEMA}; + + Object[] values = {(byte) 127, (short) 255, 32767, 327890L, 1.2F, 1.2345}; + Map<Object, List<?>> expectedProjected = new HashMap<>(); + expectedProjected.put(values[0], Arrays.asList((byte) 127, (short) 127, 127, 127L, 127.F, 127.)); + expectedProjected.put(values[1], Arrays.asList((short) 255, 255, 255L, 255.F, 255.)); + expectedProjected.put(values[2], Arrays.asList(32767, 32767L, 32767.F, 32767.)); + expectedProjected.put(values[3], Arrays.asList(327890L, 327890.F, 327890.)); + expectedProjected.put(values[4], Arrays.asList(1.2F, 1.2)); + expectedProjected.put(values[5], Arrays.asList(1.2345)); + + Object promoted; + for (int i = 0; i < promotableSchemas.length; ++i) { + Schema source = promotableSchemas[i]; + List<?> expected = expectedProjected.get(values[i]); + for (int j = i; j < promotableSchemas.length; ++j) { + Schema target = promotableSchemas[j]; + promoted = SchemaProjector.project(source, values[i], target); + if (target.type() == Type.FLOAT64) { + assertEquals((Double) (expected.get(j - i)), (double) promoted, 1e-6); + } else { + assertEquals(expected.get(j - i), promoted); + } + } + for (int j = i; j < promotableOptionalSchemas.length; ++j) { + Schema target = promotableOptionalSchemas[j]; + promoted = SchemaProjector.project(source, values[i], target); + if (target.type() == Type.FLOAT64) { + assertEquals((Double) (expected.get(j - i)), (double) promoted, 1e-6); + } else { + assertEquals(expected.get(j - i), promoted); + } + } + } + + for (int i = 0; i < promotableOptionalSchemas.length; ++i) { + Schema source = promotableSchemas[i]; + List<?> expected = expectedProjected.get(values[i]); + for (int j = i; j < promotableOptionalSchemas.length; ++j) { + Schema target = promotableOptionalSchemas[j]; + promoted = SchemaProjector.project(source, values[i], target); + if (target.type() == Type.FLOAT64) { + assertEquals((Double) (expected.get(j - i)), (double) promoted, 1e-6); + } else { + assertEquals(expected.get(j - i), promoted); + } + } + } + + Schema[] nonPromotableSchemas = {Schema.BOOLEAN_SCHEMA, Schema.BYTES_SCHEMA, Schema.STRING_SCHEMA}; + for (Schema promotableSchema: promotableSchemas) { + for (Schema nonPromotableSchema: nonPromotableSchemas) { + Object dummy = new Object(); + try { + SchemaProjector.project(promotableSchema, dummy, nonPromotableSchema); + fail("Cannot promote " + promotableSchema.type() + " to " + nonPromotableSchema.type()); + } catch (DataException e) { + // expected + } + } + } + } + + @Test + public void testPrimitiveOptionalProjection() throws Exception { + verifyOptionalProjection(Schema.OPTIONAL_BOOLEAN_SCHEMA, Type.BOOLEAN, false, true, false, true); + verifyOptionalProjection(Schema.OPTIONAL_BOOLEAN_SCHEMA, Type.BOOLEAN, false, true, false, false); + + byte[] bytes = {(byte) 1, (byte) 2}; + byte[] defaultBytes = {(byte) 3, (byte) 4}; + verifyOptionalProjection(Schema.OPTIONAL_BYTES_SCHEMA, Type.BYTES, bytes, defaultBytes, bytes, true); + verifyOptionalProjection(Schema.OPTIONAL_BYTES_SCHEMA, Type.BYTES, bytes, defaultBytes, bytes, false); + + verifyOptionalProjection(Schema.OPTIONAL_STRING_SCHEMA, Type.STRING, "abc", "def", "abc", true); + verifyOptionalProjection(Schema.OPTIONAL_STRING_SCHEMA, Type.STRING, "abc", "def", "abc", false); + + verifyOptionalProjection(Schema.OPTIONAL_INT8_SCHEMA, Type.INT8, (byte) 12, (byte) 127, (byte) 12, true); + verifyOptionalProjection(Schema.OPTIONAL_INT8_SCHEMA, Type.INT8, (byte) 12, (byte) 127, (byte) 12, false); + verifyOptionalProjection(Schema.OPTIONAL_INT8_SCHEMA, Type.INT16, (byte) 12, (short) 127, (short) 12, true); + verifyOptionalProjection(Schema.OPTIONAL_INT8_SCHEMA, Type.INT16, (byte) 12, (short) 127, (short) 12, false); + verifyOptionalProjection(Schema.OPTIONAL_INT8_SCHEMA, Type.INT32, (byte) 12, 12789, 12, true); + verifyOptionalProjection(Schema.OPTIONAL_INT8_SCHEMA, Type.INT32, (byte) 12, 12789, 12, false); + verifyOptionalProjection(Schema.OPTIONAL_INT8_SCHEMA, Type.INT64, (byte) 12, 127890L, 12L, true); + verifyOptionalProjection(Schema.OPTIONAL_INT8_SCHEMA, Type.INT64, (byte) 12, 127890L, 12L, false); + verifyOptionalProjection(Schema.OPTIONAL_INT8_SCHEMA, Type.FLOAT32, (byte) 12, 3.45F, 12.F, true); + verifyOptionalProjection(Schema.OPTIONAL_INT8_SCHEMA, Type.FLOAT32, (byte) 12, 3.45F, 12.F, false); + verifyOptionalProjection(Schema.OPTIONAL_INT8_SCHEMA, Type.FLOAT64, (byte) 12, 3.4567, 12., true); + verifyOptionalProjection(Schema.OPTIONAL_INT8_SCHEMA, Type.FLOAT64, (byte) 12, 3.4567, 12., false); + + verifyOptionalProjection(Schema.OPTIONAL_INT16_SCHEMA, Type.INT16, (short) 12, (short) 127, (short) 12, true); + verifyOptionalProjection(Schema.OPTIONAL_INT16_SCHEMA, Type.INT16, (short) 12, (short) 127, (short) 12, false); + verifyOptionalProjection(Schema.OPTIONAL_INT16_SCHEMA, Type.INT32, (short) 12, 12789, 12, true); + verifyOptionalProjection(Schema.OPTIONAL_INT16_SCHEMA, Type.INT32, (short) 12, 12789, 12, false); + verifyOptionalProjection(Schema.OPTIONAL_INT16_SCHEMA, Type.INT64, (short) 12, 127890L, 12L, true); + verifyOptionalProjection(Schema.OPTIONAL_INT16_SCHEMA, Type.INT64, (short) 12, 127890L, 12L, false); + verifyOptionalProjection(Schema.OPTIONAL_INT16_SCHEMA, Type.FLOAT32, (short) 12, 3.45F, 12.F, true); + verifyOptionalProjection(Schema.OPTIONAL_INT16_SCHEMA, Type.FLOAT32, (short) 12, 3.45F, 12.F, false); + verifyOptionalProjection(Schema.OPTIONAL_INT16_SCHEMA, Type.FLOAT64, (short) 12, 3.4567, 12., true); + verifyOptionalProjection(Schema.OPTIONAL_INT16_SCHEMA, Type.FLOAT64, (short) 12, 3.4567, 12., false); + + verifyOptionalProjection(Schema.OPTIONAL_INT32_SCHEMA, Type.INT32, 12, 12789, 12, true); + verifyOptionalProjection(Schema.OPTIONAL_INT32_SCHEMA, Type.INT32, 12, 12789, 12, false); + verifyOptionalProjection(Schema.OPTIONAL_INT32_SCHEMA, Type.INT64, 12, 127890L, 12L, true); + verifyOptionalProjection(Schema.OPTIONAL_INT32_SCHEMA, Type.INT64, 12, 127890L, 12L, false); + verifyOptionalProjection(Schema.OPTIONAL_INT32_SCHEMA, Type.FLOAT32, 12, 3.45F, 12.F, true); + verifyOptionalProjection(Schema.OPTIONAL_INT32_SCHEMA, Type.FLOAT32, 12, 3.45F, 12.F, false); + verifyOptionalProjection(Schema.OPTIONAL_INT32_SCHEMA, Type.FLOAT64, 12, 3.4567, 12., true); + verifyOptionalProjection(Schema.OPTIONAL_INT32_SCHEMA, Type.FLOAT64, 12, 3.4567, 12., false); + + verifyOptionalProjection(Schema.OPTIONAL_INT64_SCHEMA, Type.INT64, 12L, 127890L, 12L, true); + verifyOptionalProjection(Schema.OPTIONAL_INT64_SCHEMA, Type.INT64, 12L, 127890L, 12L, false); + verifyOptionalProjection(Schema.OPTIONAL_INT64_SCHEMA, Type.FLOAT32, 12L, 3.45F, 12.F, true); + verifyOptionalProjection(Schema.OPTIONAL_INT64_SCHEMA, Type.FLOAT32, 12L, 3.45F, 12.F, false); + verifyOptionalProjection(Schema.OPTIONAL_INT64_SCHEMA, Type.FLOAT64, 12L, 3.4567, 12., true); + verifyOptionalProjection(Schema.OPTIONAL_INT64_SCHEMA, Type.FLOAT64, 12L, 3.4567, 12., false); + + verifyOptionalProjection(Schema.OPTIONAL_FLOAT32_SCHEMA, Type.FLOAT32, 12.345F, 3.45F, 12.345F, true); + verifyOptionalProjection(Schema.OPTIONAL_FLOAT32_SCHEMA, Type.FLOAT32, 12.345F, 3.45F, 12.345F, false); + verifyOptionalProjection(Schema.OPTIONAL_FLOAT32_SCHEMA, Type.FLOAT64, 12.345F, 3.4567, 12.345, true); + verifyOptionalProjection(Schema.OPTIONAL_FLOAT32_SCHEMA, Type.FLOAT64, 12.345F, 3.4567, 12.345, false); + + verifyOptionalProjection(Schema.OPTIONAL_FLOAT32_SCHEMA, Type.FLOAT64, 12.345, 3.4567, 12.345, true); + verifyOptionalProjection(Schema.OPTIONAL_FLOAT32_SCHEMA, Type.FLOAT64, 12.345, 3.4567, 12.345, false); + } + + @Test + public void testStructAddField() throws Exception { + Schema source = SchemaBuilder.struct() + .field("field", Schema.INT32_SCHEMA) + .build(); + Struct sourceStruct = new Struct(source); + sourceStruct.put("field", 1); + + Schema target = SchemaBuilder.struct() + .field("field", Schema.INT32_SCHEMA) + .field("field2", SchemaBuilder.int32().defaultValue(123).build()) + .build(); + + Struct targetStruct = (Struct) SchemaProjector.project(source, sourceStruct, target); + + + assertEquals(1, (int) targetStruct.getInt32("field")); + assertEquals(123, (int) targetStruct.getInt32("field2")); + + Schema incompatibleTargetSchema = SchemaBuilder.struct() + .field("field", Schema.INT32_SCHEMA) + .field("field2", Schema.INT32_SCHEMA) + .build(); + + try { + SchemaProjector.project(source, sourceStruct, incompatibleTargetSchema); + fail("Incompatible schema."); + } catch (DataException e) { + // expected + } + } + + @Test + public void testStructRemoveField() throws Exception { + Schema source = SchemaBuilder.struct() + .field("field", Schema.INT32_SCHEMA) + .field("field2", Schema.INT32_SCHEMA) + .build(); + Struct sourceStruct = new Struct(source); + sourceStruct.put("field", 1); + sourceStruct.put("field2", 234); + + Schema target = SchemaBuilder.struct() + .field("field", Schema.INT32_SCHEMA) + .build(); + Struct targetStruct = (Struct) SchemaProjector.project(source, sourceStruct, target); + + assertEquals(1, targetStruct.get("field")); + try { + targetStruct.get("field2"); + fail("field2 is not part of the projected struct"); + } catch (DataException e) { + // expected + } + } + + @Test + public void testStructDefaultValue() throws Exception { + Schema source = SchemaBuilder.struct().optional() + .field("field", Schema.INT32_SCHEMA) + .field("field2", Schema.INT32_SCHEMA) + .build(); + + SchemaBuilder builder = SchemaBuilder.struct() + .field("field", Schema.INT32_SCHEMA) + .field("field2", Schema.INT32_SCHEMA); + + Struct defaultStruct = new Struct(builder).put("field", 12).put("field2", 345); + builder.defaultValue(defaultStruct); + Schema target = builder.build(); + + Object projected = SchemaProjector.project(source, null, target); + assertEquals(defaultStruct, projected); + + Struct sourceStruct = new Struct(source).put("field", 45).put("field2", 678); + Struct targetStruct = (Struct) SchemaProjector.project(source, sourceStruct, target); + + assertEquals(sourceStruct.get("field"), targetStruct.get("field")); + assertEquals(sourceStruct.get("field2"), targetStruct.get("field2")); + } + + @Test + public void testNestedSchemaProjection() throws Exception { + Schema sourceFlatSchema = SchemaBuilder.struct() + .field("field", Schema.INT32_SCHEMA) + .build(); + Schema targetFlatSchema = SchemaBuilder.struct() + .field("field", Schema.INT32_SCHEMA) + .field("field2", SchemaBuilder.int32().defaultValue(123).build()) + .build(); + Schema sourceNestedSchema = SchemaBuilder.struct() + .field("first", Schema.INT32_SCHEMA) + .field("second", Schema.STRING_SCHEMA) + .field("array", SchemaBuilder.array(Schema.INT32_SCHEMA).build()) + .field("map", SchemaBuilder.map(Schema.INT32_SCHEMA, Schema.STRING_SCHEMA).build()) + .field("nested", sourceFlatSchema) + .build(); + Schema targetNestedSchema = SchemaBuilder.struct() + .field("first", Schema.INT32_SCHEMA) + .field("second", Schema.STRING_SCHEMA) + .field("array", SchemaBuilder.array(Schema.INT32_SCHEMA).build()) + .field("map", SchemaBuilder.map(Schema.INT32_SCHEMA, Schema.STRING_SCHEMA).build()) + .field("nested", targetFlatSchema) + .build(); + + Struct sourceFlatStruct = new Struct(sourceFlatSchema); + sourceFlatStruct.put("field", 113); + + Struct sourceNestedStruct = new Struct(sourceNestedSchema); + sourceNestedStruct.put("first", 1); + sourceNestedStruct.put("second", "abc"); + sourceNestedStruct.put("array", Arrays.asList(1, 2)); + sourceNestedStruct.put("map", Collections.singletonMap(5, "def")); + sourceNestedStruct.put("nested", sourceFlatStruct); + + Struct targetNestedStruct = (Struct) SchemaProjector.project(sourceNestedSchema, sourceNestedStruct, + targetNestedSchema); + assertEquals(1, targetNestedStruct.get("first")); + assertEquals("abc", targetNestedStruct.get("second")); + assertEquals(Arrays.asList(1, 2), (List<Integer>) targetNestedStruct.get("array")); + assertEquals(Collections.singletonMap(5, "def"), (Map<Integer, String>) targetNestedStruct.get("map")); + + Struct projectedStruct = (Struct) targetNestedStruct.get("nested"); + assertEquals(113, projectedStruct.get("field")); + assertEquals(123, projectedStruct.get("field2")); + } + + @Test + public void testLogicalTypeProjection() throws Exception { + Schema[] logicalTypeSchemas = {Decimal.schema(2), Date.SCHEMA, Time.SCHEMA, Timestamp.SCHEMA}; + Object projected; + + BigDecimal testDecimal = new BigDecimal(new BigInteger("156"), 2); + projected = SchemaProjector.project(Decimal.schema(2), testDecimal, Decimal.schema(2)); + assertEquals(testDecimal, projected); + + projected = SchemaProjector.project(Date.SCHEMA, 1000, Date.SCHEMA); + assertEquals(1000, projected); + + projected = SchemaProjector.project(Time.SCHEMA, 231, Time.SCHEMA); + assertEquals(231, projected); + + projected = SchemaProjector.project(Timestamp.SCHEMA, 34567L, Timestamp.SCHEMA); + assertEquals(34567L, projected); + + Schema namedSchema = SchemaBuilder.int32().name("invalidLogicalTypeName").build(); + for (Schema logicalTypeSchema: logicalTypeSchemas) { + try { + SchemaProjector.project(logicalTypeSchema, null, Schema.BOOLEAN_SCHEMA); + fail("Cannot project logical types to non-logical types."); + } catch (SchemaProjectorException e) { + // expected + } + + try { + SchemaProjector.project(logicalTypeSchema, null, namedSchema); + fail("Reader name is not a valid logical type name."); + } catch (SchemaProjectorException e) { + // expected + } + + try { + SchemaProjector.project(Schema.BOOLEAN_SCHEMA, null, logicalTypeSchema); + fail("Cannot project non-logical types to logical types."); + } catch (SchemaProjectorException e) { + // expected + } + } + } + + @Test + public void testArrayProjection() throws Exception { + Schema source = SchemaBuilder.array(Schema.INT32_SCHEMA).build(); + + Object projected = SchemaProjector.project(source, Arrays.asList(1, 2, 3), source); + assertEquals(Arrays.asList(1, 2, 3), (List<Integer>) projected); + + Schema optionalSource = SchemaBuilder.array(Schema.INT32_SCHEMA).optional().build(); + Schema target = SchemaBuilder.array(Schema.INT32_SCHEMA).defaultValue(Arrays.asList(1, 2, 3)).build(); + projected = SchemaProjector.project(optionalSource, Arrays.asList(4, 5), target); + assertEquals(Arrays.asList(4, 5), (List<Integer>) projected); + projected = SchemaProjector.project(optionalSource, null, target); + assertEquals(Arrays.asList(1, 2, 3), (List<Integer>) projected); + + Schema promotedTarget = SchemaBuilder.array(Schema.INT64_SCHEMA).defaultValue(Arrays.asList(1L, 2L, 3L)).build(); + projected = SchemaProjector.project(optionalSource, Arrays.asList(4, 5), promotedTarget); + List<Long> expectedProjected = Arrays.asList(4L, 5L); + assertEquals(expectedProjected, (List<Long>) projected); + projected = SchemaProjector.project(optionalSource, null, promotedTarget); + assertEquals(Arrays.asList(1L, 2L, 3L), (List<Long>) projected); + + Schema noDefaultValueTarget = SchemaBuilder.array(Schema.INT32_SCHEMA).build(); + try { + SchemaProjector.project(optionalSource, null, noDefaultValueTarget); + fail("Target schema does not provide a default value."); + } catch (SchemaProjectorException e) { + // expected + } + + Schema nonPromotableTarget = SchemaBuilder.array(Schema.BOOLEAN_SCHEMA).build(); + try { + SchemaProjector.project(optionalSource, null, nonPromotableTarget); + fail("Neither source type matches target type nor source type can be promoted to target type"); + } catch (SchemaProjectorException e) { + // expected + } + } + + @Test + public void testMapProjection() throws Exception { + Schema source = SchemaBuilder.map(Schema.INT32_SCHEMA, Schema.INT32_SCHEMA).optional().build(); + + Schema target = SchemaBuilder.map(Schema.INT32_SCHEMA, Schema.INT32_SCHEMA).defaultValue(Collections.singletonMap(1, 2)).build(); + Object projected = SchemaProjector.project(source, Collections.singletonMap(3, 4), target); + assertEquals(Collections.singletonMap(3, 4), (Map<Integer, Integer>) projected); + projected = SchemaProjector.project(source, null, target); + assertEquals(Collections.singletonMap(1, 2), (Map<Integer, Integer>) projected); + + Schema promotedTarget = SchemaBuilder.map(Schema.INT64_SCHEMA, Schema.FLOAT32_SCHEMA).defaultValue( + Collections.singletonMap(3L, 4.5F)).build(); + projected = SchemaProjector.project(source, Collections.singletonMap(3, 4), promotedTarget); + assertEquals(Collections.singletonMap(3L, 4.F), (Map<Long, Float>) projected); + projected = SchemaProjector.project(source, null, promotedTarget); + assertEquals(Collections.singletonMap(3L, 4.5F), (Map<Long, Float>) projected); + + Schema noDefaultValueTarget = SchemaBuilder.map(Schema.INT32_SCHEMA, Schema.INT32_SCHEMA).build(); + try { + SchemaProjector.project(source, null, noDefaultValueTarget); + fail("Reader does not provide a default value."); + } catch (SchemaProjectorException e) { + // expected + } + + Schema nonPromotableTarget = SchemaBuilder.map(Schema.BOOLEAN_SCHEMA, Schema.STRING_SCHEMA).build(); + try { + SchemaProjector.project(source, null, nonPromotableTarget); + fail("Neither source type matches target type nor source type can be promoted to target type"); + } catch (SchemaProjectorException e) { + // expected + } + } + + @Test + public void testMaybeCompatible() throws Exception { + Schema source = SchemaBuilder.int32().name("source").build(); + Schema target = SchemaBuilder.int32().name("target").build(); + + try { + SchemaProjector.project(source, 12, target); + fail("Source name and target name mismatch."); + } catch (SchemaProjectorException e) { + // expected + } + + Schema targetWithParameters = SchemaBuilder.int32().parameters(Collections.singletonMap("key", "value")); + try { + SchemaProjector.project(source, 34, targetWithParameters); + fail("Source parameters and target parameters mismatch."); + } catch (SchemaProjectorException e) { + // expected + } + } + + private void verifyOptionalProjection(Schema source, Type targetType, Object value, Object defaultValue, Object expectedProjected, boolean optional) { + Schema target; + assert source.isOptional(); + assert value != null; + if (optional) { + target = SchemaBuilder.type(targetType).optional().defaultValue(defaultValue).build(); + } else { + target = SchemaBuilder.type(targetType).defaultValue(defaultValue).build(); + } + Object projected = SchemaProjector.project(source, value, target); + if (targetType == Type.FLOAT64) { + assertEquals((double) expectedProjected, (double) projected, 1e-6); + } else { + assertEquals(expectedProjected, projected); + } + + projected = SchemaProjector.project(source, null, target); + if (optional) { + assertEquals(null, projected); + } else { + assertEquals(defaultValue, projected); + } + } +}
http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/api/src/test/java/org/apache/kafka/connect/data/StructTest.java ---------------------------------------------------------------------- diff --git a/connect/api/src/test/java/org/apache/kafka/connect/data/StructTest.java b/connect/api/src/test/java/org/apache/kafka/connect/data/StructTest.java new file mode 100644 index 0000000..c73992b --- /dev/null +++ b/connect/api/src/test/java/org/apache/kafka/connect/data/StructTest.java @@ -0,0 +1,222 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.data; + +import org.apache.kafka.connect.errors.DataException; +import org.junit.Test; + +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; + +public class StructTest { + + private static final Schema FLAT_STRUCT_SCHEMA = SchemaBuilder.struct() + .field("int8", Schema.INT8_SCHEMA) + .field("int16", Schema.INT16_SCHEMA) + .field("int32", Schema.INT32_SCHEMA) + .field("int64", Schema.INT64_SCHEMA) + .field("float32", Schema.FLOAT32_SCHEMA) + .field("float64", Schema.FLOAT64_SCHEMA) + .field("boolean", Schema.BOOLEAN_SCHEMA) + .field("string", Schema.STRING_SCHEMA) + .field("bytes", Schema.BYTES_SCHEMA) + .build(); + + private static final Schema ARRAY_SCHEMA = SchemaBuilder.array(Schema.INT8_SCHEMA).build(); + private static final Schema MAP_SCHEMA = SchemaBuilder.map( + Schema.INT32_SCHEMA, + Schema.STRING_SCHEMA + ).build(); + private static final Schema NESTED_CHILD_SCHEMA = SchemaBuilder.struct() + .field("int8", Schema.INT8_SCHEMA) + .build(); + private static final Schema NESTED_SCHEMA = SchemaBuilder.struct() + .field("array", ARRAY_SCHEMA) + .field("map", MAP_SCHEMA) + .field("nested", NESTED_CHILD_SCHEMA) + .build(); + + private static final Schema REQUIRED_FIELD_SCHEMA = Schema.INT8_SCHEMA; + private static final Schema OPTIONAL_FIELD_SCHEMA = SchemaBuilder.int8().optional().build(); + private static final Schema DEFAULT_FIELD_SCHEMA = SchemaBuilder.int8().defaultValue((byte) 0).build(); + + @Test + public void testFlatStruct() { + Struct struct = new Struct(FLAT_STRUCT_SCHEMA) + .put("int8", (byte) 12) + .put("int16", (short) 12) + .put("int32", 12) + .put("int64", (long) 12) + .put("float32", 12.f) + .put("float64", 12.) + .put("boolean", true) + .put("string", "foobar") + .put("bytes", "foobar".getBytes()); + + // Test equality, and also the type-specific getters + assertEquals((byte) 12, (byte) struct.getInt8("int8")); + assertEquals((short) 12, (short) struct.getInt16("int16")); + assertEquals(12, (int) struct.getInt32("int32")); + assertEquals((long) 12, (long) struct.getInt64("int64")); + assertEquals((Float) 12.f, struct.getFloat32("float32")); + assertEquals((Double) 12., struct.getFloat64("float64")); + assertEquals(true, struct.getBoolean("boolean")); + assertEquals("foobar", struct.getString("string")); + assertEquals(ByteBuffer.wrap("foobar".getBytes()), ByteBuffer.wrap(struct.getBytes("bytes"))); + + struct.validate(); + } + + @Test + public void testComplexStruct() { + List<Byte> array = Arrays.asList((byte) 1, (byte) 2); + Map<Integer, String> map = Collections.singletonMap(1, "string"); + Struct struct = new Struct(NESTED_SCHEMA) + .put("array", array) + .put("map", map) + .put("nested", new Struct(NESTED_CHILD_SCHEMA).put("int8", (byte) 12)); + + // Separate the call to get the array and map to validate the typed get methods work properly + List<Byte> arrayExtracted = struct.getArray("array"); + assertEquals(array, arrayExtracted); + Map<Byte, Byte> mapExtracted = struct.getMap("map"); + assertEquals(map, mapExtracted); + assertEquals((byte) 12, struct.getStruct("nested").get("int8")); + + struct.validate(); + } + + + // These don't test all the ways validation can fail, just one for each element. See more extensive validation + // tests in SchemaTest. These are meant to ensure that we are invoking the same code path and that we do deeper + // inspection than just checking the class of the object + + @Test(expected = DataException.class) + public void testInvalidFieldType() { + new Struct(FLAT_STRUCT_SCHEMA).put("int8", "should fail because this is a string, not int8"); + } + + @Test(expected = DataException.class) + public void testInvalidArrayFieldElements() { + new Struct(NESTED_SCHEMA).put("array", Arrays.asList("should fail since elements should be int8s")); + } + + @Test(expected = DataException.class) + public void testInvalidMapKeyElements() { + new Struct(NESTED_SCHEMA).put("map", Collections.singletonMap("should fail because keys should be int8s", (byte) 12)); + } + + @Test(expected = DataException.class) + public void testInvalidStructFieldSchema() { + new Struct(NESTED_SCHEMA).put("nested", new Struct(MAP_SCHEMA)); + } + + @Test(expected = DataException.class) + public void testInvalidStructFieldValue() { + new Struct(NESTED_SCHEMA).put("nested", new Struct(NESTED_CHILD_SCHEMA)); + } + + + @Test(expected = DataException.class) + public void testMissingFieldValidation() { + // Required int8 field + Schema schema = SchemaBuilder.struct().field("field", REQUIRED_FIELD_SCHEMA).build(); + Struct struct = new Struct(schema); + struct.validate(); + } + + @Test + public void testMissingOptionalFieldValidation() { + Schema schema = SchemaBuilder.struct().field("field", OPTIONAL_FIELD_SCHEMA).build(); + Struct struct = new Struct(schema); + struct.validate(); + } + + @Test + public void testMissingFieldWithDefaultValidation() { + Schema schema = SchemaBuilder.struct().field("field", DEFAULT_FIELD_SCHEMA).build(); + Struct struct = new Struct(schema); + struct.validate(); + } + + + @Test + public void testEquals() { + Struct struct1 = new Struct(FLAT_STRUCT_SCHEMA) + .put("int8", (byte) 12) + .put("int16", (short) 12) + .put("int32", 12) + .put("int64", (long) 12) + .put("float32", 12.f) + .put("float64", 12.) + .put("boolean", true) + .put("string", "foobar") + .put("bytes", ByteBuffer.wrap("foobar".getBytes())); + Struct struct2 = new Struct(FLAT_STRUCT_SCHEMA) + .put("int8", (byte) 12) + .put("int16", (short) 12) + .put("int32", 12) + .put("int64", (long) 12) + .put("float32", 12.f) + .put("float64", 12.) + .put("boolean", true) + .put("string", "foobar") + .put("bytes", ByteBuffer.wrap("foobar".getBytes())); + Struct struct3 = new Struct(FLAT_STRUCT_SCHEMA) + .put("int8", (byte) 12) + .put("int16", (short) 12) + .put("int32", 12) + .put("int64", (long) 12) + .put("float32", 12.f) + .put("float64", 12.) + .put("boolean", true) + .put("string", "mismatching string") + .put("bytes", ByteBuffer.wrap("foobar".getBytes())); + + assertEquals(struct1, struct2); + assertNotEquals(struct1, struct3); + + List<Byte> array = Arrays.asList((byte) 1, (byte) 2); + Map<Integer, String> map = Collections.singletonMap(1, "string"); + struct1 = new Struct(NESTED_SCHEMA) + .put("array", array) + .put("map", map) + .put("nested", new Struct(NESTED_CHILD_SCHEMA).put("int8", (byte) 12)); + List<Byte> array2 = Arrays.asList((byte) 1, (byte) 2); + Map<Integer, String> map2 = Collections.singletonMap(1, "string"); + struct2 = new Struct(NESTED_SCHEMA) + .put("array", array2) + .put("map", map2) + .put("nested", new Struct(NESTED_CHILD_SCHEMA).put("int8", (byte) 12)); + List<Byte> array3 = Arrays.asList((byte) 1, (byte) 2, (byte) 3); + Map<Integer, String> map3 = Collections.singletonMap(2, "string"); + struct3 = new Struct(NESTED_SCHEMA) + .put("array", array3) + .put("map", map3) + .put("nested", new Struct(NESTED_CHILD_SCHEMA).put("int8", (byte) 13)); + + assertEquals(struct1, struct2); + assertNotEquals(struct1, struct3); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/api/src/test/java/org/apache/kafka/connect/data/TimeTest.java ---------------------------------------------------------------------- diff --git a/connect/api/src/test/java/org/apache/kafka/connect/data/TimeTest.java b/connect/api/src/test/java/org/apache/kafka/connect/data/TimeTest.java new file mode 100644 index 0000000..45bdc4e --- /dev/null +++ b/connect/api/src/test/java/org/apache/kafka/connect/data/TimeTest.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.data; + +import org.apache.kafka.connect.errors.DataException; +import org.junit.Test; + +import java.util.Calendar; +import java.util.GregorianCalendar; +import java.util.TimeZone; + +import static org.junit.Assert.assertEquals; + +public class TimeTest { + private static final GregorianCalendar EPOCH; + private static final GregorianCalendar EPOCH_PLUS_DATE_COMPONENT; + private static final GregorianCalendar EPOCH_PLUS_TEN_THOUSAND_MILLIS; + static { + EPOCH = new GregorianCalendar(1970, Calendar.JANUARY, 1, 0, 0, 0); + EPOCH.setTimeZone(TimeZone.getTimeZone("UTC")); + + EPOCH_PLUS_TEN_THOUSAND_MILLIS = new GregorianCalendar(1970, Calendar.JANUARY, 1, 0, 0, 0); + EPOCH_PLUS_TEN_THOUSAND_MILLIS.setTimeZone(TimeZone.getTimeZone("UTC")); + EPOCH_PLUS_TEN_THOUSAND_MILLIS.add(Calendar.MILLISECOND, 10000); + + + EPOCH_PLUS_DATE_COMPONENT = new GregorianCalendar(1970, Calendar.JANUARY, 1, 0, 0, 0); + EPOCH_PLUS_DATE_COMPONENT.setTimeZone(TimeZone.getTimeZone("UTC")); + EPOCH_PLUS_DATE_COMPONENT.add(Calendar.DATE, 10000); + } + + @Test + public void testBuilder() { + Schema plain = Time.SCHEMA; + assertEquals(Time.LOGICAL_NAME, plain.name()); + assertEquals(1, (Object) plain.version()); + } + + @Test + public void testFromLogical() { + assertEquals(0, Time.fromLogical(Time.SCHEMA, EPOCH.getTime())); + assertEquals(10000, Time.fromLogical(Time.SCHEMA, EPOCH_PLUS_TEN_THOUSAND_MILLIS.getTime())); + } + + @Test(expected = DataException.class) + public void testFromLogicalInvalidSchema() { + Time.fromLogical(Time.builder().name("invalid").build(), EPOCH.getTime()); + } + + @Test(expected = DataException.class) + public void testFromLogicalInvalidHasDateComponents() { + Time.fromLogical(Time.SCHEMA, EPOCH_PLUS_DATE_COMPONENT.getTime()); + } + + @Test + public void testToLogical() { + assertEquals(EPOCH.getTime(), Time.toLogical(Time.SCHEMA, 0)); + assertEquals(EPOCH_PLUS_TEN_THOUSAND_MILLIS.getTime(), Time.toLogical(Time.SCHEMA, 10000)); + } + + @Test(expected = DataException.class) + public void testToLogicalInvalidSchema() { + Time.toLogical(Time.builder().name("invalid").build(), 0); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/api/src/test/java/org/apache/kafka/connect/data/TimestampTest.java ---------------------------------------------------------------------- diff --git a/connect/api/src/test/java/org/apache/kafka/connect/data/TimestampTest.java b/connect/api/src/test/java/org/apache/kafka/connect/data/TimestampTest.java new file mode 100644 index 0000000..6121160 --- /dev/null +++ b/connect/api/src/test/java/org/apache/kafka/connect/data/TimestampTest.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.data; + +import org.apache.kafka.connect.errors.DataException; +import org.junit.Test; + +import java.util.Calendar; +import java.util.GregorianCalendar; +import java.util.TimeZone; + +import static org.junit.Assert.assertEquals; + +public class TimestampTest { + private static final GregorianCalendar EPOCH; + private static final GregorianCalendar EPOCH_PLUS_MILLIS; + + private static final int NUM_MILLIS = 2000000000; + private static final long TOTAL_MILLIS = ((long) NUM_MILLIS) * 2; + + static { + EPOCH = new GregorianCalendar(1970, Calendar.JANUARY, 1, 0, 0, 0); + EPOCH.setTimeZone(TimeZone.getTimeZone("UTC")); + + + EPOCH_PLUS_MILLIS = new GregorianCalendar(1970, Calendar.JANUARY, 1, 0, 0, 0); + EPOCH_PLUS_MILLIS.setTimeZone(TimeZone.getTimeZone("UTC")); + EPOCH_PLUS_MILLIS.add(Calendar.MILLISECOND, NUM_MILLIS); + EPOCH_PLUS_MILLIS.add(Calendar.MILLISECOND, NUM_MILLIS); + } + + @Test + public void testBuilder() { + Schema plain = Date.SCHEMA; + assertEquals(Date.LOGICAL_NAME, plain.name()); + assertEquals(1, (Object) plain.version()); + } + + @Test + public void testFromLogical() { + assertEquals(0L, Timestamp.fromLogical(Timestamp.SCHEMA, EPOCH.getTime())); + assertEquals(TOTAL_MILLIS, Timestamp.fromLogical(Timestamp.SCHEMA, EPOCH_PLUS_MILLIS.getTime())); + } + + @Test(expected = DataException.class) + public void testFromLogicalInvalidSchema() { + Timestamp.fromLogical(Timestamp.builder().name("invalid").build(), EPOCH.getTime()); + } + + @Test + public void testToLogical() { + assertEquals(EPOCH.getTime(), Timestamp.toLogical(Timestamp.SCHEMA, 0L)); + assertEquals(EPOCH_PLUS_MILLIS.getTime(), Timestamp.toLogical(Timestamp.SCHEMA, TOTAL_MILLIS)); + } + + @Test(expected = DataException.class) + public void testToLogicalInvalidSchema() { + Date.toLogical(Date.builder().name("invalid").build(), 0); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/api/src/test/java/org/apache/kafka/connect/storage/StringConverterTest.java ---------------------------------------------------------------------- diff --git a/connect/api/src/test/java/org/apache/kafka/connect/storage/StringConverterTest.java b/connect/api/src/test/java/org/apache/kafka/connect/storage/StringConverterTest.java new file mode 100644 index 0000000..017b2d3 --- /dev/null +++ b/connect/api/src/test/java/org/apache/kafka/connect/storage/StringConverterTest.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.storage; + +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaAndValue; +import org.junit.Test; + +import java.io.UnsupportedEncodingException; +import java.util.Collections; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertArrayEquals; + +public class StringConverterTest { + private static final String TOPIC = "topic"; + private static final String SAMPLE_STRING = "a string"; + + private StringConverter converter = new StringConverter(); + + @Test + public void testStringToBytes() throws UnsupportedEncodingException { + assertArrayEquals(SAMPLE_STRING.getBytes("UTF8"), converter.fromConnectData(TOPIC, Schema.STRING_SCHEMA, SAMPLE_STRING)); + } + + @Test + public void testNonStringToBytes() throws UnsupportedEncodingException { + assertArrayEquals("true".getBytes("UTF8"), converter.fromConnectData(TOPIC, Schema.BOOLEAN_SCHEMA, true)); + } + + @Test + public void testNullToBytes() { + assertEquals(null, converter.fromConnectData(TOPIC, Schema.OPTIONAL_STRING_SCHEMA, null)); + } + + @Test + public void testToBytesIgnoresSchema() throws UnsupportedEncodingException { + assertArrayEquals("true".getBytes("UTF8"), converter.fromConnectData(TOPIC, null, true)); + } + + @Test + public void testToBytesNonUtf8Encoding() throws UnsupportedEncodingException { + converter.configure(Collections.singletonMap("converter.encoding", "UTF-16"), true); + assertArrayEquals(SAMPLE_STRING.getBytes("UTF-16"), converter.fromConnectData(TOPIC, Schema.STRING_SCHEMA, SAMPLE_STRING)); + } + + @Test + public void testBytesToString() { + SchemaAndValue data = converter.toConnectData(TOPIC, SAMPLE_STRING.getBytes()); + assertEquals(Schema.OPTIONAL_STRING_SCHEMA, data.schema()); + assertEquals(SAMPLE_STRING, data.value()); + } + + @Test + public void testBytesNullToString() { + SchemaAndValue data = converter.toConnectData(TOPIC, null); + assertEquals(Schema.OPTIONAL_STRING_SCHEMA, data.schema()); + assertEquals(null, data.value()); + } + + @Test + public void testBytesToStringNonUtf8Encoding() throws UnsupportedEncodingException { + converter.configure(Collections.singletonMap("converter.encoding", "UTF-16"), true); + SchemaAndValue data = converter.toConnectData(TOPIC, SAMPLE_STRING.getBytes("UTF-16")); + assertEquals(Schema.OPTIONAL_STRING_SCHEMA, data.schema()); + assertEquals(SAMPLE_STRING, data.value()); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/api/src/test/java/org/apache/kafka/connect/util/ConnectorUtilsTest.java ---------------------------------------------------------------------- diff --git a/connect/api/src/test/java/org/apache/kafka/connect/util/ConnectorUtilsTest.java b/connect/api/src/test/java/org/apache/kafka/connect/util/ConnectorUtilsTest.java new file mode 100644 index 0000000..9f8338d --- /dev/null +++ b/connect/api/src/test/java/org/apache/kafka/connect/util/ConnectorUtilsTest.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.util; + +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +public class ConnectorUtilsTest { + + private static final List<Integer> FIVE_ELEMENTS = Arrays.asList(1, 2, 3, 4, 5); + + @Test + public void testGroupPartitions() { + + List<List<Integer>> grouped = ConnectorUtils.groupPartitions(FIVE_ELEMENTS, 1); + assertEquals(Arrays.asList(FIVE_ELEMENTS), grouped); + + grouped = ConnectorUtils.groupPartitions(FIVE_ELEMENTS, 2); + assertEquals(Arrays.asList(Arrays.asList(1, 2, 3), Arrays.asList(4, 5)), grouped); + + grouped = ConnectorUtils.groupPartitions(FIVE_ELEMENTS, 3); + assertEquals(Arrays.asList(Arrays.asList(1, 2), + Arrays.asList(3, 4), + Arrays.asList(5)), grouped); + + grouped = ConnectorUtils.groupPartitions(FIVE_ELEMENTS, 5); + assertEquals(Arrays.asList(Arrays.asList(1), + Arrays.asList(2), + Arrays.asList(3), + Arrays.asList(4), + Arrays.asList(5)), grouped); + + grouped = ConnectorUtils.groupPartitions(FIVE_ELEMENTS, 7); + assertEquals(Arrays.asList(Arrays.asList(1), + Arrays.asList(2), + Arrays.asList(3), + Arrays.asList(4), + Arrays.asList(5), + Collections.EMPTY_LIST, + Collections.EMPTY_LIST), grouped); + } + + @Test(expected = IllegalArgumentException.class) + public void testGroupPartitionsInvalidCount() { + ConnectorUtils.groupPartitions(FIVE_ELEMENTS, 0); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkConnector.java ---------------------------------------------------------------------- diff --git a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkConnector.java b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkConnector.java new file mode 100644 index 0000000..a73153f --- /dev/null +++ b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkConnector.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.file; + +import org.apache.kafka.common.utils.AppInfoParser; +import org.apache.kafka.connect.connector.Task; +import org.apache.kafka.connect.sink.SinkConnector; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Very simple connector that works with the console. This connector supports both source and + * sink modes via its 'mode' setting. + */ +public class FileStreamSinkConnector extends SinkConnector { + public static final String FILE_CONFIG = "file"; + + private String filename; + + @Override + public String version() { + return AppInfoParser.getVersion(); + } + + @Override + public void start(Map<String, String> props) { + filename = props.get(FILE_CONFIG); + } + + @Override + public Class<? extends Task> taskClass() { + return FileStreamSinkTask.class; + } + + @Override + public List<Map<String, String>> taskConfigs(int maxTasks) { + ArrayList<Map<String, String>> configs = new ArrayList<>(); + for (int i = 0; i < maxTasks; i++) { + Map<String, String> config = new HashMap<>(); + if (filename != null) + config.put(FILE_CONFIG, filename); + configs.add(config); + } + return configs; + } + + @Override + public void stop() { + // Nothing to do since FileStreamSinkConnector has no background monitoring. + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkTask.java ---------------------------------------------------------------------- diff --git a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkTask.java b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkTask.java new file mode 100644 index 0000000..83ba6d4 --- /dev/null +++ b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkTask.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.file; + +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.sink.SinkRecord; +import org.apache.kafka.connect.sink.SinkTask; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.PrintStream; +import java.util.Collection; +import java.util.Map; + +/** + * FileStreamSinkTask writes records to stdout or a file. + */ +public class FileStreamSinkTask extends SinkTask { + private static final Logger log = LoggerFactory.getLogger(FileStreamSinkTask.class); + + private String filename; + private PrintStream outputStream; + + public FileStreamSinkTask() { + } + + // for testing + public FileStreamSinkTask(PrintStream outputStream) { + filename = null; + this.outputStream = outputStream; + } + + @Override + public String version() { + return new FileStreamSinkConnector().version(); + } + + @Override + public void start(Map<String, String> props) { + filename = props.get(FileStreamSinkConnector.FILE_CONFIG); + if (filename == null) { + outputStream = System.out; + } else { + try { + outputStream = new PrintStream(new FileOutputStream(filename, true)); + } catch (FileNotFoundException e) { + throw new ConnectException("Couldn't find or create file for FileStreamSinkTask", e); + } + } + } + + @Override + public void put(Collection<SinkRecord> sinkRecords) { + for (SinkRecord record : sinkRecords) { + log.trace("Writing line to {}: {}", logFilename(), record.value()); + outputStream.println(record.value()); + } + } + + @Override + public void flush(Map<TopicPartition, OffsetAndMetadata> offsets) { + log.trace("Flushing output stream for {}", logFilename()); + outputStream.flush(); + } + + @Override + public void stop() { + if (outputStream != System.out) + outputStream.close(); + } + + private String logFilename() { + return filename == null ? "stdout" : filename; + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java ---------------------------------------------------------------------- diff --git a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java new file mode 100644 index 0000000..843e999 --- /dev/null +++ b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.file; + +import org.apache.kafka.common.utils.AppInfoParser; +import org.apache.kafka.connect.connector.Task; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.source.SourceConnector; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Very simple connector that works with the console. This connector supports both source and + * sink modes via its 'mode' setting. + */ +public class FileStreamSourceConnector extends SourceConnector { + public static final String TOPIC_CONFIG = "topic"; + public static final String FILE_CONFIG = "file"; + + private String filename; + private String topic; + + @Override + public String version() { + return AppInfoParser.getVersion(); + } + + @Override + public void start(Map<String, String> props) { + filename = props.get(FILE_CONFIG); + topic = props.get(TOPIC_CONFIG); + if (topic == null || topic.isEmpty()) + throw new ConnectException("FileStreamSourceConnector configuration must include 'topic' setting"); + if (topic.contains(",")) + throw new ConnectException("FileStreamSourceConnector should only have a single topic when used as a source."); + } + + @Override + public Class<? extends Task> taskClass() { + return FileStreamSourceTask.class; + } + + @Override + public List<Map<String, String>> taskConfigs(int maxTasks) { + ArrayList<Map<String, String>> configs = new ArrayList<>(); + // Only one input stream makes sense. + Map<String, String> config = new HashMap<>(); + if (filename != null) + config.put(FILE_CONFIG, filename); + config.put(TOPIC_CONFIG, topic); + configs.add(config); + return configs; + } + + @Override + public void stop() { + // Nothing to do since FileStreamSourceConnector has no background monitoring. + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceTask.java ---------------------------------------------------------------------- diff --git a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceTask.java b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceTask.java new file mode 100644 index 0000000..f30b603 --- /dev/null +++ b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceTask.java @@ -0,0 +1,216 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.file; + +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.source.SourceRecord; +import org.apache.kafka.connect.source.SourceTask; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.*; +import java.util.*; + +/** + * FileStreamSourceTask reads from stdin or a file. + */ +public class FileStreamSourceTask extends SourceTask { + private static final Logger log = LoggerFactory.getLogger(FileStreamSourceTask.class); + public static final String FILENAME_FIELD = "filename"; + public static final String POSITION_FIELD = "position"; + private static final Schema VALUE_SCHEMA = Schema.STRING_SCHEMA; + + private String filename; + private InputStream stream; + private BufferedReader reader = null; + private char[] buffer = new char[1024]; + private int offset = 0; + private String topic = null; + + private Long streamOffset; + + @Override + public String version() { + return new FileStreamSourceConnector().version(); + } + + @Override + public void start(Map<String, String> props) { + filename = props.get(FileStreamSourceConnector.FILE_CONFIG); + if (filename == null || filename.isEmpty()) { + stream = System.in; + // Tracking offset for stdin doesn't make sense + streamOffset = null; + reader = new BufferedReader(new InputStreamReader(stream)); + } + topic = props.get(FileStreamSourceConnector.TOPIC_CONFIG); + if (topic == null) + throw new ConnectException("FileStreamSourceTask config missing topic setting"); + } + + @Override + public List<SourceRecord> poll() throws InterruptedException { + if (stream == null) { + try { + stream = new FileInputStream(filename); + Map<String, Object> offset = context.offsetStorageReader().offset(Collections.singletonMap(FILENAME_FIELD, filename)); + if (offset != null) { + Object lastRecordedOffset = offset.get(POSITION_FIELD); + if (lastRecordedOffset != null && !(lastRecordedOffset instanceof Long)) + throw new ConnectException("Offset position is the incorrect type"); + if (lastRecordedOffset != null) { + log.debug("Found previous offset, trying to skip to file offset {}", lastRecordedOffset); + long skipLeft = (Long) lastRecordedOffset; + while (skipLeft > 0) { + try { + long skipped = stream.skip(skipLeft); + skipLeft -= skipped; + } catch (IOException e) { + log.error("Error while trying to seek to previous offset in file: ", e); + throw new ConnectException(e); + } + } + log.debug("Skipped to offset {}", lastRecordedOffset); + } + streamOffset = (lastRecordedOffset != null) ? (Long) lastRecordedOffset : 0L; + } else { + streamOffset = 0L; + } + reader = new BufferedReader(new InputStreamReader(stream)); + log.debug("Opened {} for reading", logFilename()); + } catch (FileNotFoundException e) { + log.warn("Couldn't find file for FileStreamSourceTask, sleeping to wait for it to be created"); + synchronized (this) { + this.wait(1000); + } + return null; + } + } + + // Unfortunately we can't just use readLine() because it blocks in an uninterruptible way. + // Instead we have to manage splitting lines ourselves, using simple backoff when no new data + // is available. + try { + final BufferedReader readerCopy; + synchronized (this) { + readerCopy = reader; + } + if (readerCopy == null) + return null; + + ArrayList<SourceRecord> records = null; + + int nread = 0; + while (readerCopy.ready()) { + nread = readerCopy.read(buffer, offset, buffer.length - offset); + log.trace("Read {} bytes from {}", nread, logFilename()); + + if (nread > 0) { + offset += nread; + if (offset == buffer.length) { + char[] newbuf = new char[buffer.length * 2]; + System.arraycopy(buffer, 0, newbuf, 0, buffer.length); + buffer = newbuf; + } + + String line; + do { + line = extractLine(); + if (line != null) { + log.trace("Read a line from {}", logFilename()); + if (records == null) + records = new ArrayList<>(); + records.add(new SourceRecord(offsetKey(filename), offsetValue(streamOffset), topic, VALUE_SCHEMA, line)); + } + new ArrayList<SourceRecord>(); + } while (line != null); + } + } + + if (nread <= 0) + synchronized (this) { + this.wait(1000); + } + + return records; + } catch (IOException e) { + // Underlying stream was killed, probably as a result of calling stop. Allow to return + // null, and driving thread will handle any shutdown if necessary. + } + return null; + } + + private String extractLine() { + int until = -1, newStart = -1; + for (int i = 0; i < offset; i++) { + if (buffer[i] == '\n') { + until = i; + newStart = i + 1; + break; + } else if (buffer[i] == '\r') { + // We need to check for \r\n, so we must skip this if we can't check the next char + if (i + 1 >= offset) + return null; + + until = i; + newStart = (buffer[i + 1] == '\n') ? i + 2 : i + 1; + break; + } + } + + if (until != -1) { + String result = new String(buffer, 0, until); + System.arraycopy(buffer, newStart, buffer, 0, buffer.length - newStart); + offset = offset - newStart; + if (streamOffset != null) + streamOffset += newStart; + return result; + } else { + return null; + } + } + + @Override + public void stop() { + log.trace("Stopping"); + synchronized (this) { + try { + if (stream != null && stream != System.in) { + stream.close(); + log.trace("Closed input stream"); + } + } catch (IOException e) { + log.error("Failed to close FileStreamSourceTask stream: ", e); + } + this.notify(); + } + } + + private Map<String, String> offsetKey(String filename) { + return Collections.singletonMap(FILENAME_FIELD, filename); + } + + private Map<String, Long> offsetValue(Long pos) { + return Collections.singletonMap(POSITION_FIELD, pos); + } + + private String logFilename() { + return filename == null ? "stdin" : filename; + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSinkConnectorTest.java ---------------------------------------------------------------------- diff --git a/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSinkConnectorTest.java b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSinkConnectorTest.java new file mode 100644 index 0000000..5ed03f4 --- /dev/null +++ b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSinkConnectorTest.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.file; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.connect.connector.ConnectorContext; +import org.apache.kafka.connect.sink.SinkConnector; +import org.junit.Before; +import org.junit.Test; +import org.powermock.api.easymock.PowerMock; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; + +public class FileStreamSinkConnectorTest { + + private static final String MULTIPLE_TOPICS = "test1,test2"; + private static final String[] MULTIPLE_TOPICS_LIST + = MULTIPLE_TOPICS.split(","); + private static final List<TopicPartition> MULTIPLE_TOPICS_PARTITIONS = Arrays.asList( + new TopicPartition("test1", 1), new TopicPartition("test2", 2) + ); + private static final String FILENAME = "/afilename"; + + private FileStreamSinkConnector connector; + private ConnectorContext ctx; + private Map<String, String> sinkProperties; + + @Before + public void setup() { + connector = new FileStreamSinkConnector(); + ctx = PowerMock.createMock(ConnectorContext.class); + connector.initialize(ctx); + + sinkProperties = new HashMap<>(); + sinkProperties.put(SinkConnector.TOPICS_CONFIG, MULTIPLE_TOPICS); + sinkProperties.put(FileStreamSinkConnector.FILE_CONFIG, FILENAME); + } + + @Test + public void testSinkTasks() { + PowerMock.replayAll(); + + connector.start(sinkProperties); + List<Map<String, String>> taskConfigs = connector.taskConfigs(1); + assertEquals(1, taskConfigs.size()); + assertEquals(FILENAME, taskConfigs.get(0).get(FileStreamSinkConnector.FILE_CONFIG)); + + taskConfigs = connector.taskConfigs(2); + assertEquals(2, taskConfigs.size()); + for (int i = 0; i < 2; i++) { + assertEquals(FILENAME, taskConfigs.get(0).get(FileStreamSinkConnector.FILE_CONFIG)); + } + + PowerMock.verifyAll(); + } + + @Test + public void testTaskClass() { + PowerMock.replayAll(); + + connector.start(sinkProperties); + assertEquals(FileStreamSinkTask.class, connector.taskClass()); + + PowerMock.verifyAll(); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSinkTaskTest.java ---------------------------------------------------------------------- diff --git a/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSinkTaskTest.java b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSinkTaskTest.java new file mode 100644 index 0000000..754e7f5 --- /dev/null +++ b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSinkTaskTest.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.file; + +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.sink.SinkRecord; +import org.junit.Before; +import org.junit.Test; + +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; +import java.util.Arrays; +import java.util.HashMap; + +import static org.junit.Assert.assertEquals; + +public class FileStreamSinkTaskTest { + + private FileStreamSinkTask task; + private ByteArrayOutputStream os; + private PrintStream printStream; + + @Before + public void setup() { + os = new ByteArrayOutputStream(); + printStream = new PrintStream(os); + task = new FileStreamSinkTask(printStream); + } + + @Test + public void testPutFlush() { + HashMap<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(); + + // We do not call task.start() since it would override the output stream + + task.put(Arrays.asList( + new SinkRecord("topic1", 0, null, null, Schema.STRING_SCHEMA, "line1", 1) + )); + offsets.put(new TopicPartition("topic1", 0), new OffsetAndMetadata(1L)); + task.flush(offsets); + assertEquals("line1\n", os.toString()); + + task.put(Arrays.asList( + new SinkRecord("topic1", 0, null, null, Schema.STRING_SCHEMA, "line2", 2), + new SinkRecord("topic2", 0, null, null, Schema.STRING_SCHEMA, "line3", 1) + )); + offsets.put(new TopicPartition("topic1", 0), new OffsetAndMetadata(2L)); + offsets.put(new TopicPartition("topic2", 0), new OffsetAndMetadata(1L)); + task.flush(offsets); + assertEquals("line1\nline2\nline3\n", os.toString()); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceConnectorTest.java ---------------------------------------------------------------------- diff --git a/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceConnectorTest.java b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceConnectorTest.java new file mode 100644 index 0000000..80ff7f5 --- /dev/null +++ b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceConnectorTest.java @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.file; + +import org.apache.kafka.connect.connector.ConnectorContext; +import org.apache.kafka.connect.errors.ConnectException; +import org.junit.Before; +import org.junit.Test; +import org.powermock.api.easymock.PowerMock; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +public class FileStreamSourceConnectorTest { + + private static final String SINGLE_TOPIC = "test"; + private static final String MULTIPLE_TOPICS = "test1,test2"; + private static final String FILENAME = "/somefilename"; + + private FileStreamSourceConnector connector; + private ConnectorContext ctx; + private Map<String, String> sourceProperties; + + @Before + public void setup() { + connector = new FileStreamSourceConnector(); + ctx = PowerMock.createMock(ConnectorContext.class); + connector.initialize(ctx); + + sourceProperties = new HashMap<>(); + sourceProperties.put(FileStreamSourceConnector.TOPIC_CONFIG, SINGLE_TOPIC); + sourceProperties.put(FileStreamSourceConnector.FILE_CONFIG, FILENAME); + } + + @Test + public void testSourceTasks() { + PowerMock.replayAll(); + + connector.start(sourceProperties); + List<Map<String, String>> taskConfigs = connector.taskConfigs(1); + assertEquals(1, taskConfigs.size()); + assertEquals(FILENAME, + taskConfigs.get(0).get(FileStreamSourceConnector.FILE_CONFIG)); + assertEquals(SINGLE_TOPIC, + taskConfigs.get(0).get(FileStreamSourceConnector.TOPIC_CONFIG)); + + // Should be able to return fewer than requested # + taskConfigs = connector.taskConfigs(2); + assertEquals(1, taskConfigs.size()); + assertEquals(FILENAME, + taskConfigs.get(0).get(FileStreamSourceConnector.FILE_CONFIG)); + assertEquals(SINGLE_TOPIC, + taskConfigs.get(0).get(FileStreamSourceConnector.TOPIC_CONFIG)); + + PowerMock.verifyAll(); + } + + @Test + public void testSourceTasksStdin() { + PowerMock.replayAll(); + + sourceProperties.remove(FileStreamSourceConnector.FILE_CONFIG); + connector.start(sourceProperties); + List<Map<String, String>> taskConfigs = connector.taskConfigs(1); + assertEquals(1, taskConfigs.size()); + assertNull(taskConfigs.get(0).get(FileStreamSourceConnector.FILE_CONFIG)); + + PowerMock.verifyAll(); + } + + @Test(expected = ConnectException.class) + public void testMultipleSourcesInvalid() { + sourceProperties.put(FileStreamSourceConnector.TOPIC_CONFIG, MULTIPLE_TOPICS); + connector.start(sourceProperties); + } + + @Test + public void testTaskClass() { + PowerMock.replayAll(); + + connector.start(sourceProperties); + assertEquals(FileStreamSourceTask.class, connector.taskClass()); + + PowerMock.verifyAll(); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceTaskTest.java ---------------------------------------------------------------------- diff --git a/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceTaskTest.java b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceTaskTest.java new file mode 100644 index 0000000..3689313 --- /dev/null +++ b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceTaskTest.java @@ -0,0 +1,150 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.file; + +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.source.SourceRecord; +import org.apache.kafka.connect.source.SourceTaskContext; +import org.apache.kafka.connect.storage.OffsetStorageReader; +import org.easymock.EasyMock; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.powermock.api.easymock.PowerMock; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; + +public class FileStreamSourceTaskTest { + + private static final String TOPIC = "test"; + + private File tempFile; + private Map<String, String> config; + private OffsetStorageReader offsetStorageReader; + private SourceTaskContext context; + private FileStreamSourceTask task; + + private boolean verifyMocks = false; + + @Before + public void setup() throws IOException { + tempFile = File.createTempFile("file-stream-source-task-test", null); + config = new HashMap<>(); + config.put(FileStreamSourceConnector.FILE_CONFIG, tempFile.getAbsolutePath()); + config.put(FileStreamSourceConnector.TOPIC_CONFIG, TOPIC); + task = new FileStreamSourceTask(); + offsetStorageReader = PowerMock.createMock(OffsetStorageReader.class); + context = PowerMock.createMock(SourceTaskContext.class); + task.initialize(context); + } + + @After + public void teardown() { + tempFile.delete(); + + if (verifyMocks) + PowerMock.verifyAll(); + } + + private void replay() { + PowerMock.replayAll(); + verifyMocks = true; + } + + @Test + public void testNormalLifecycle() throws InterruptedException, IOException { + expectOffsetLookupReturnNone(); + replay(); + + task.start(config); + + FileOutputStream os = new FileOutputStream(tempFile); + assertEquals(null, task.poll()); + os.write("partial line".getBytes()); + os.flush(); + assertEquals(null, task.poll()); + os.write(" finished\n".getBytes()); + os.flush(); + List<SourceRecord> records = task.poll(); + assertEquals(1, records.size()); + assertEquals(TOPIC, records.get(0).topic()); + assertEquals("partial line finished", records.get(0).value()); + assertEquals(Collections.singletonMap(FileStreamSourceTask.FILENAME_FIELD, tempFile.getAbsolutePath()), records.get(0).sourcePartition()); + assertEquals(Collections.singletonMap(FileStreamSourceTask.POSITION_FIELD, 22L), records.get(0).sourceOffset()); + assertEquals(null, task.poll()); + + // Different line endings, and make sure the final \r doesn't result in a line until we can + // read the subsequent byte. + os.write("line1\rline2\r\nline3\nline4\n\r".getBytes()); + os.flush(); + records = task.poll(); + assertEquals(4, records.size()); + assertEquals("line1", records.get(0).value()); + assertEquals(Collections.singletonMap(FileStreamSourceTask.FILENAME_FIELD, tempFile.getAbsolutePath()), records.get(0).sourcePartition()); + assertEquals(Collections.singletonMap(FileStreamSourceTask.POSITION_FIELD, 28L), records.get(0).sourceOffset()); + assertEquals("line2", records.get(1).value()); + assertEquals(Collections.singletonMap(FileStreamSourceTask.FILENAME_FIELD, tempFile.getAbsolutePath()), records.get(1).sourcePartition()); + assertEquals(Collections.singletonMap(FileStreamSourceTask.POSITION_FIELD, 35L), records.get(1).sourceOffset()); + assertEquals("line3", records.get(2).value()); + assertEquals(Collections.singletonMap(FileStreamSourceTask.FILENAME_FIELD, tempFile.getAbsolutePath()), records.get(2).sourcePartition()); + assertEquals(Collections.singletonMap(FileStreamSourceTask.POSITION_FIELD, 41L), records.get(2).sourceOffset()); + assertEquals("line4", records.get(3).value()); + assertEquals(Collections.singletonMap(FileStreamSourceTask.FILENAME_FIELD, tempFile.getAbsolutePath()), records.get(3).sourcePartition()); + assertEquals(Collections.singletonMap(FileStreamSourceTask.POSITION_FIELD, 47L), records.get(3).sourceOffset()); + + os.write("subsequent text".getBytes()); + os.flush(); + records = task.poll(); + assertEquals(1, records.size()); + assertEquals("", records.get(0).value()); + assertEquals(Collections.singletonMap(FileStreamSourceTask.FILENAME_FIELD, tempFile.getAbsolutePath()), records.get(0).sourcePartition()); + assertEquals(Collections.singletonMap(FileStreamSourceTask.POSITION_FIELD, 48L), records.get(0).sourceOffset()); + + task.stop(); + } + + @Test(expected = ConnectException.class) + public void testMissingTopic() throws InterruptedException { + replay(); + + config.remove(FileStreamSourceConnector.TOPIC_CONFIG); + task.start(config); + } + + public void testInvalidFile() throws InterruptedException { + config.put(FileStreamSourceConnector.FILE_CONFIG, "bogusfilename"); + task.start(config); + // Currently the task retries indefinitely if the file isn't found, but shouldn't return any data. + for (int i = 0; i < 100; i++) + assertEquals(null, task.poll()); + } + + + private void expectOffsetLookupReturnNone() { + EasyMock.expect(context.offsetStorageReader()).andReturn(offsetStorageReader); + EasyMock.expect(offsetStorageReader.offset(EasyMock.anyObject(Map.class))).andReturn(null); + } +} \ No newline at end of file