http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/api/src/main/java/org/apache/kafka/connect/source/SourceRecord.java ---------------------------------------------------------------------- diff --git a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceRecord.java b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceRecord.java new file mode 100644 index 0000000..1890062 --- /dev/null +++ b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceRecord.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.source; + +import org.apache.kafka.common.annotation.InterfaceStability; +import org.apache.kafka.connect.connector.ConnectRecord; +import org.apache.kafka.connect.data.Schema; + +import java.util.Map; + +/** + * <p> + * SourceRecords are generated by SourceTasks and passed to Kafka Connect for storage in + * Kafka. In addition to the standard fields in {@link ConnectRecord} which specify where data is stored + * in Kafka, they also include a sourcePartition and sourceOffset. + * </p> + * <p> + * The sourcePartition represents a single input sourcePartition that the record came from (e.g. a filename, table + * name, or topic-partition). The sourceOffset represents a position in that sourcePartition which can be used + * to resume consumption of data. + * </p> + * <p> + * These values can have arbitrary structure and should be represented using + * org.apache.kafka.connect.data objects (or primitive values). For example, a database connector + * might specify the sourcePartition as a record containing { "db": "database_name", "table": + * "table_name"} and the sourceOffset as a Long containing the timestamp of the row. + * </p> + */ +@InterfaceStability.Unstable +public class SourceRecord extends ConnectRecord { + private final Map<String, ?> sourcePartition; + private final Map<String, ?> sourceOffset; + + public SourceRecord(Map<String, ?> sourcePartition, Map<String, ?> sourceOffset, + String topic, Integer partition, Schema valueSchema, Object value) { + this(sourcePartition, sourceOffset, topic, partition, null, null, valueSchema, value); + } + + public SourceRecord(Map<String, ?> sourcePartition, Map<String, ?> sourceOffset, + String topic, Schema valueSchema, Object value) { + this(sourcePartition, sourceOffset, topic, null, null, null, valueSchema, value); + } + + public SourceRecord(Map<String, ?> sourcePartition, Map<String, ?> sourceOffset, + String topic, Integer partition, + Schema keySchema, Object key, Schema valueSchema, Object value) { + super(topic, partition, keySchema, key, valueSchema, value); + this.sourcePartition = sourcePartition; + this.sourceOffset = sourceOffset; + } + + public Map<String, ?> sourcePartition() { + return sourcePartition; + } + + public Map<String, ?> sourceOffset() { + return sourceOffset; + } + + @Override + public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + if (!super.equals(o)) + return false; + + SourceRecord that = (SourceRecord) o; + + if (sourcePartition != null ? !sourcePartition.equals(that.sourcePartition) : that.sourcePartition != null) + return false; + if (sourceOffset != null ? !sourceOffset.equals(that.sourceOffset) : that.sourceOffset != null) + return false; + + return true; + } + + @Override + public int hashCode() { + int result = super.hashCode(); + result = 31 * result + (sourcePartition != null ? sourcePartition.hashCode() : 0); + result = 31 * result + (sourceOffset != null ? sourceOffset.hashCode() : 0); + return result; + } + + @Override + public String toString() { + return "SourceRecord{" + + "sourcePartition=" + sourcePartition + + ", sourceOffset=" + sourceOffset + + "} " + super.toString(); + } +}
http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java ---------------------------------------------------------------------- diff --git a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java new file mode 100644 index 0000000..5110504 --- /dev/null +++ b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.source; + +import org.apache.kafka.common.annotation.InterfaceStability; +import org.apache.kafka.connect.connector.Task; + +import java.util.List; +import java.util.Map; + +/** + * SourceTask is a Task that pulls records from another system for storage in Kafka. + */ +@InterfaceStability.Unstable +public abstract class SourceTask implements Task { + + protected SourceTaskContext context; + + /** + * Initialize this SourceTask with the specified context object. + */ + public void initialize(SourceTaskContext context) { + this.context = context; + } + + /** + * Start the Task. This should handle any configuration parsing and one-time setup of the task. + * @param props initial configuration + */ + @Override + public abstract void start(Map<String, String> props); + + /** + * Poll this SourceTask for new records. This method should block if no data is currently + * available. + * + * @return a list of source records + */ + public abstract List<SourceRecord> poll() throws InterruptedException; + + /** + * <p> + * Commit the offsets, up to the offsets that have been returned by {@link #poll()}. This + * method should block until the commit is complete. + * </p> + * <p> + * SourceTasks are not required to implement this functionality; Kafka Connect will record offsets + * automatically. This hook is provided for systems that also need to store offsets internally + * in their own system. + * </p> + */ + public void commit() throws InterruptedException { + // This space intentionally left blank. + } + + /** + * Signal this SourceTask to stop. In SourceTasks, this method only needs to signal to the task that it should stop + * trying to poll for new data and interrupt any outstanding poll() requests. It is not required that the task has + * fully stopped. Note that this method necessarily may be invoked from a different thread than {@link #poll()} and + * {@link #commit()}. + * + * For example, if a task uses a {@link java.nio.channels.Selector} to receive data over the network, this method + * could set a flag that will force {@link #poll()} to exit immediately and invoke + * {@link java.nio.channels.Selector#wakeup()} to interrupt any ongoing requests. + */ + public abstract void stop(); +} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTaskContext.java ---------------------------------------------------------------------- diff --git a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTaskContext.java b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTaskContext.java new file mode 100644 index 0000000..200fa5f --- /dev/null +++ b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTaskContext.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ +package org.apache.kafka.connect.source; + +import org.apache.kafka.common.annotation.InterfaceStability; +import org.apache.kafka.connect.storage.OffsetStorageReader; + +/** + * SourceTaskContext is provided to SourceTasks to allow them to interact with the underlying + * runtime. + */ +@InterfaceStability.Unstable +public interface SourceTaskContext { + /** + * Get the OffsetStorageReader for this SourceTask. + */ + OffsetStorageReader offsetStorageReader(); +} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/api/src/main/java/org/apache/kafka/connect/storage/Converter.java ---------------------------------------------------------------------- diff --git a/connect/api/src/main/java/org/apache/kafka/connect/storage/Converter.java b/connect/api/src/main/java/org/apache/kafka/connect/storage/Converter.java new file mode 100644 index 0000000..158ddb1 --- /dev/null +++ b/connect/api/src/main/java/org/apache/kafka/connect/storage/Converter.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.storage; + +import org.apache.kafka.common.annotation.InterfaceStability; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaAndValue; + +import java.util.Map; + +/** + * The Converter interface provides support for translating between Kafka Connect's runtime data format + * and byte[]. Internally, this likely includes an intermediate step to the format used by the serialization + * layer (e.g. JsonNode, GenericRecord, Message). + */ +@InterfaceStability.Unstable +public interface Converter { + + /** + * Configure this class. + * @param configs configs in key/value pairs + * @param isKey whether is for key or value + */ + void configure(Map<String, ?> configs, boolean isKey); + + /** + * Convert a Kafka Connect data object to a native object for serialization. + * @param topic the topic associated with the data + * @param schema the schema for the value + * @param value the value to convert + * @return + */ + byte[] fromConnectData(String topic, Schema schema, Object value); + + /** + * Convert a native object to a Kafka Connect data object. + * @param topic the topic associated with the data + * @param value the value to convert + * @return an object containing the {@link Schema} and the converted value + */ + SchemaAndValue toConnectData(String topic, byte[] value); +} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/api/src/main/java/org/apache/kafka/connect/storage/OffsetStorageReader.java ---------------------------------------------------------------------- diff --git a/connect/api/src/main/java/org/apache/kafka/connect/storage/OffsetStorageReader.java b/connect/api/src/main/java/org/apache/kafka/connect/storage/OffsetStorageReader.java new file mode 100644 index 0000000..9307c23 --- /dev/null +++ b/connect/api/src/main/java/org/apache/kafka/connect/storage/OffsetStorageReader.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.storage; + +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.util.Collection; +import java.util.Map; + +/** + * <p> + * OffsetStorageReader provides access to the offset storage used by sources. This can be used by + * connectors to determine offsets to start consuming data from. This is most commonly used during + * initialization of a task, but can also be used during runtime, e.g. when reconfiguring a task. + * </p> + * <p> + * Offsets are always defined as Maps of Strings to primitive types, i.e. all types supported by + * {@link org.apache.kafka.connect.data.Schema} other than Array, Map, and Struct. + * </p> + */ +@InterfaceStability.Unstable +public interface OffsetStorageReader { + /** + * Get the offset for the specified partition. If the data isn't already available locally, this + * gets it from the backing store, which may require some network round trips. + * + * @param partition object uniquely identifying the partition of data + * @return object uniquely identifying the offset in the partition of data + */ + <T> Map<String, Object> offset(Map<String, T> partition); + + /** + * <p> + * Get a set of offsets for the specified partition identifiers. This may be more efficient + * than calling {@link #offset(Map)} repeatedly. + * </p> + * <p> + * Note that when errors occur, this method omits the associated data and tries to return as + * many of the requested values as possible. This allows a task that's managing many partitions to + * still proceed with any available data. Therefore, implementations should take care to check + * that the data is actually available in the returned response. The only case when an + * exception will be thrown is if the entire request failed, e.g. because the underlying + * storage was unavailable. + * </p> + * + * @param partitions set of identifiers for partitions of data + * @return a map of partition identifiers to decoded offsets + */ + <T> Map<Map<String, T>, Map<String, Object>> offsets(Collection<Map<String, T>> partitions); +} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/api/src/main/java/org/apache/kafka/connect/storage/StringConverter.java ---------------------------------------------------------------------- diff --git a/connect/api/src/main/java/org/apache/kafka/connect/storage/StringConverter.java b/connect/api/src/main/java/org/apache/kafka/connect/storage/StringConverter.java new file mode 100644 index 0000000..5859f18 --- /dev/null +++ b/connect/api/src/main/java/org/apache/kafka/connect/storage/StringConverter.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.storage; + +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaAndValue; +import org.apache.kafka.connect.errors.DataException; + +import java.util.HashMap; +import java.util.Map; + +/** + * {@link Converter} implementation that only supports serializing to strings. When converting Kafka Connect data to bytes, + * the schema will be ignored and {@link Object#toString()} will always be invoked to convert the data to a String. + * When converting from bytes to Kafka Connect format, the converter will only ever return an optional string schema and + * a string or null. + * + * Encoding configuration is identical to {@link StringSerializer} and {@link StringDeserializer}, but for convenience + * this class can also be configured to use the same encoding for both encoding and decoding with the converter.encoding + * setting. + */ +public class StringConverter implements Converter { + private final StringSerializer serializer = new StringSerializer(); + private final StringDeserializer deserializer = new StringDeserializer(); + + public StringConverter() { + } + + @Override + public void configure(Map<String, ?> configs, boolean isKey) { + Map<String, Object> serializerConfigs = new HashMap<>(); + serializerConfigs.putAll(configs); + Map<String, Object> deserializerConfigs = new HashMap<>(); + deserializerConfigs.putAll(configs); + + Object encodingValue = configs.get("converter.encoding"); + if (encodingValue != null) { + serializerConfigs.put("serializer.encoding", encodingValue); + deserializerConfigs.put("deserializer.encoding", encodingValue); + } + + serializer.configure(serializerConfigs, isKey); + deserializer.configure(deserializerConfigs, isKey); + } + + @Override + public byte[] fromConnectData(String topic, Schema schema, Object value) { + try { + return serializer.serialize(topic, value == null ? null : value.toString()); + } catch (SerializationException e) { + throw new DataException("Failed to serialize to a string: ", e); + } + } + + @Override + public SchemaAndValue toConnectData(String topic, byte[] value) { + try { + return new SchemaAndValue(Schema.OPTIONAL_STRING_SCHEMA, deserializer.deserialize(topic, value)); + } catch (SerializationException e) { + throw new DataException("Failed to deserialize string: ", e); + } + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/api/src/main/java/org/apache/kafka/connect/util/ConnectorUtils.java ---------------------------------------------------------------------- diff --git a/connect/api/src/main/java/org/apache/kafka/connect/util/ConnectorUtils.java b/connect/api/src/main/java/org/apache/kafka/connect/util/ConnectorUtils.java new file mode 100644 index 0000000..35250eb --- /dev/null +++ b/connect/api/src/main/java/org/apache/kafka/connect/util/ConnectorUtils.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.util; + +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.util.ArrayList; +import java.util.List; + +/** + * Utilities that connector implementations might find useful. Contains common building blocks + * for writing connectors. + */ +@InterfaceStability.Unstable +public class ConnectorUtils { + /** + * Given a list of elements and a target number of groups, generates list of groups of + * elements to match the target number of groups, spreading them evenly among the groups. + * This generates groups with contiguous elements, which results in intuitive ordering if + * your elements are also ordered (e.g. alphabetical lists of table names if you sort + * table names alphabetically to generate the raw partitions) or can result in efficient + * partitioning if elements are sorted according to some criteria that affects performance + * (e.g. topic partitions with the same leader). + * + * @param elements list of elements to partition + * @param numGroups the number of output groups to generate. + */ + public static <T> List<List<T>> groupPartitions(List<T> elements, int numGroups) { + if (numGroups <= 0) + throw new IllegalArgumentException("Number of groups must be positive."); + + List<List<T>> result = new ArrayList<>(numGroups); + + // Each group has either n+1 or n raw partitions + int perGroup = elements.size() / numGroups; + int leftover = elements.size() - (numGroups * perGroup); + + int assigned = 0; + for (int group = 0; group < numGroups; group++) { + int numThisGroup = group < leftover ? perGroup + 1 : perGroup; + List<T> groupList = new ArrayList<>(numThisGroup); + for (int i = 0; i < numThisGroup; i++) { + groupList.add(elements.get(assigned)); + assigned++; + } + result.add(groupList); + } + + return result; + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/api/src/test/java/org/apache/kafka/connect/connector/ConnectorReconfigurationTest.java ---------------------------------------------------------------------- diff --git a/connect/api/src/test/java/org/apache/kafka/connect/connector/ConnectorReconfigurationTest.java b/connect/api/src/test/java/org/apache/kafka/connect/connector/ConnectorReconfigurationTest.java new file mode 100644 index 0000000..7ea1de2 --- /dev/null +++ b/connect/api/src/test/java/org/apache/kafka/connect/connector/ConnectorReconfigurationTest.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.connector; + +import org.apache.kafka.connect.errors.ConnectException; +import org.junit.Test; + +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; + +public class ConnectorReconfigurationTest { + + @Test + public void testDefaultReconfigure() throws Exception { + TestConnector conn = new TestConnector(false); + conn.reconfigure(Collections.<String, String>emptyMap()); + assertEquals(conn.stopOrder, 0); + assertEquals(conn.configureOrder, 1); + } + + @Test(expected = ConnectException.class) + public void testReconfigureStopException() throws Exception { + TestConnector conn = new TestConnector(true); + conn.reconfigure(Collections.<String, String>emptyMap()); + } + + private static class TestConnector extends Connector { + private boolean stopException; + private int order = 0; + public int stopOrder = -1; + public int configureOrder = -1; + + public TestConnector(boolean stopException) { + this.stopException = stopException; + } + + @Override + public String version() { + return "1.0"; + } + + @Override + public void start(Map<String, String> props) { + configureOrder = order++; + } + + @Override + public Class<? extends Task> taskClass() { + return null; + } + + @Override + public List<Map<String, String>> taskConfigs(int count) { + return null; + } + + @Override + public void stop() { + stopOrder = order++; + if (stopException) + throw new ConnectException("error"); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/api/src/test/java/org/apache/kafka/connect/data/ConnectSchemaTest.java ---------------------------------------------------------------------- diff --git a/connect/api/src/test/java/org/apache/kafka/connect/data/ConnectSchemaTest.java b/connect/api/src/test/java/org/apache/kafka/connect/data/ConnectSchemaTest.java new file mode 100644 index 0000000..4388ade --- /dev/null +++ b/connect/api/src/test/java/org/apache/kafka/connect/data/ConnectSchemaTest.java @@ -0,0 +1,303 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.data; + +import org.apache.kafka.connect.errors.DataException; +import org.junit.Test; + +import java.math.BigDecimal; +import java.math.BigInteger; +import java.nio.ByteBuffer; +import java.nio.CharBuffer; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNull; + +public class ConnectSchemaTest { + private static final Schema MAP_INT_STRING_SCHEMA = SchemaBuilder.map(Schema.INT32_SCHEMA, Schema.STRING_SCHEMA).build(); + private static final Schema FLAT_STRUCT_SCHEMA = SchemaBuilder.struct() + .field("field", Schema.INT32_SCHEMA) + .build(); + private static final Schema STRUCT_SCHEMA = SchemaBuilder.struct() + .field("first", Schema.INT32_SCHEMA) + .field("second", Schema.STRING_SCHEMA) + .field("array", SchemaBuilder.array(Schema.INT32_SCHEMA).build()) + .field("map", SchemaBuilder.map(Schema.INT32_SCHEMA, Schema.STRING_SCHEMA).build()) + .field("nested", FLAT_STRUCT_SCHEMA) + .build(); + private static final Schema PARENT_STRUCT_SCHEMA = SchemaBuilder.struct() + .field("nested", FLAT_STRUCT_SCHEMA) + .build(); + + @Test + public void testFieldsOnStructSchema() { + Schema schema = SchemaBuilder.struct() + .field("foo", Schema.BOOLEAN_SCHEMA) + .field("bar", Schema.INT32_SCHEMA) + .build(); + + assertEquals(2, schema.fields().size()); + // Validate field lookup by name + Field foo = schema.field("foo"); + assertEquals(0, foo.index()); + Field bar = schema.field("bar"); + assertEquals(1, bar.index()); + // Any other field name should fail + assertNull(schema.field("other")); + } + + + @Test(expected = DataException.class) + public void testFieldsOnlyValidForStructs() { + Schema.INT8_SCHEMA.fields(); + } + + @Test + public void testValidateValueMatchingType() { + ConnectSchema.validateValue(Schema.INT8_SCHEMA, (byte) 1); + ConnectSchema.validateValue(Schema.INT16_SCHEMA, (short) 1); + ConnectSchema.validateValue(Schema.INT32_SCHEMA, 1); + ConnectSchema.validateValue(Schema.INT64_SCHEMA, (long) 1); + ConnectSchema.validateValue(Schema.FLOAT32_SCHEMA, 1.f); + ConnectSchema.validateValue(Schema.FLOAT64_SCHEMA, 1.); + ConnectSchema.validateValue(Schema.BOOLEAN_SCHEMA, true); + ConnectSchema.validateValue(Schema.STRING_SCHEMA, "a string"); + ConnectSchema.validateValue(Schema.BYTES_SCHEMA, "a byte array".getBytes()); + ConnectSchema.validateValue(Schema.BYTES_SCHEMA, ByteBuffer.wrap("a byte array".getBytes())); + ConnectSchema.validateValue(SchemaBuilder.array(Schema.INT32_SCHEMA).build(), Arrays.asList(1, 2, 3)); + ConnectSchema.validateValue( + SchemaBuilder.map(Schema.INT32_SCHEMA, Schema.STRING_SCHEMA).build(), + Collections.singletonMap(1, "value") + ); + // Struct tests the basic struct layout + complex field types + nested structs + Struct structValue = new Struct(STRUCT_SCHEMA) + .put("first", 1) + .put("second", "foo") + .put("array", Arrays.asList(1, 2, 3)) + .put("map", Collections.singletonMap(1, "value")) + .put("nested", new Struct(FLAT_STRUCT_SCHEMA).put("field", 12)); + ConnectSchema.validateValue(STRUCT_SCHEMA, structValue); + } + + @Test + public void testValidateValueMatchingLogicalType() { + ConnectSchema.validateValue(Decimal.schema(2), new BigDecimal(new BigInteger("156"), 2)); + ConnectSchema.validateValue(Date.SCHEMA, new java.util.Date(0)); + ConnectSchema.validateValue(Time.SCHEMA, new java.util.Date(0)); + ConnectSchema.validateValue(Timestamp.SCHEMA, new java.util.Date(0)); + } + + // To avoid requiring excessive numbers of tests, these checks for invalid types use a similar type where possible + // to only include a single test for each type + + @Test(expected = DataException.class) + public void testValidateValueMismatchInt8() { + ConnectSchema.validateValue(Schema.INT8_SCHEMA, 1); + } + + @Test(expected = DataException.class) + public void testValidateValueMismatchInt16() { + ConnectSchema.validateValue(Schema.INT16_SCHEMA, 1); + } + + @Test(expected = DataException.class) + public void testValidateValueMismatchInt32() { + ConnectSchema.validateValue(Schema.INT32_SCHEMA, (long) 1); + } + + @Test(expected = DataException.class) + public void testValidateValueMismatchInt64() { + ConnectSchema.validateValue(Schema.INT64_SCHEMA, 1); + } + + @Test(expected = DataException.class) + public void testValidateValueMismatchFloat() { + ConnectSchema.validateValue(Schema.FLOAT32_SCHEMA, 1.0); + } + + @Test(expected = DataException.class) + public void testValidateValueMismatchDouble() { + ConnectSchema.validateValue(Schema.FLOAT64_SCHEMA, 1.f); + } + + @Test(expected = DataException.class) + public void testValidateValueMismatchBoolean() { + ConnectSchema.validateValue(Schema.BOOLEAN_SCHEMA, 1.f); + } + + @Test(expected = DataException.class) + public void testValidateValueMismatchString() { + // CharSequence is a similar type (supertype of String), but we restrict to String. + CharBuffer cbuf = CharBuffer.wrap("abc"); + ConnectSchema.validateValue(Schema.STRING_SCHEMA, cbuf); + } + + @Test(expected = DataException.class) + public void testValidateValueMismatchBytes() { + ConnectSchema.validateValue(Schema.BYTES_SCHEMA, new Object[]{1, "foo"}); + } + + @Test(expected = DataException.class) + public void testValidateValueMismatchArray() { + ConnectSchema.validateValue(SchemaBuilder.array(Schema.INT32_SCHEMA).build(), Arrays.asList("a", "b", "c")); + } + + @Test(expected = DataException.class) + public void testValidateValueMismatchArraySomeMatch() { + // Even if some match the right type, this should fail if any mismatch. In this case, type erasure loses + // the fact that the list is actually List<Object>, but we couldn't tell if only checking the first element + ConnectSchema.validateValue(SchemaBuilder.array(Schema.INT32_SCHEMA).build(), Arrays.asList(1, 2, "c")); + } + + @Test(expected = DataException.class) + public void testValidateValueMismatchMapKey() { + ConnectSchema.validateValue(MAP_INT_STRING_SCHEMA, Collections.singletonMap("wrong key type", "value")); + } + + @Test(expected = DataException.class) + public void testValidateValueMismatchMapValue() { + ConnectSchema.validateValue(MAP_INT_STRING_SCHEMA, Collections.singletonMap(1, 2)); + } + + @Test(expected = DataException.class) + public void testValidateValueMismatchMapSomeKeys() { + Map<Object, String> data = new HashMap<>(); + data.put(1, "abc"); + data.put("wrong", "it's as easy as one two three"); + ConnectSchema.validateValue(MAP_INT_STRING_SCHEMA, data); + } + + @Test(expected = DataException.class) + public void testValidateValueMismatchMapSomeValues() { + Map<Integer, Object> data = new HashMap<>(); + data.put(1, "abc"); + data.put(2, "wrong".getBytes()); + ConnectSchema.validateValue(MAP_INT_STRING_SCHEMA, data); + } + + @Test(expected = DataException.class) + public void testValidateValueMismatchStructWrongSchema() { + // Completely mismatching schemas + ConnectSchema.validateValue( + FLAT_STRUCT_SCHEMA, + new Struct(SchemaBuilder.struct().field("x", Schema.INT32_SCHEMA).build()).put("x", 1) + ); + } + + @Test(expected = DataException.class) + public void testValidateValueMismatchStructWrongNestedSchema() { + // Top-level schema matches, but nested does not. + ConnectSchema.validateValue( + PARENT_STRUCT_SCHEMA, + new Struct(PARENT_STRUCT_SCHEMA) + .put("nested", new Struct(SchemaBuilder.struct().field("x", Schema.INT32_SCHEMA).build()).put("x", 1)) + ); + } + + @Test(expected = DataException.class) + public void testValidateValueMismatchDecimal() { + ConnectSchema.validateValue(Decimal.schema(2), new BigInteger("156")); + } + + @Test(expected = DataException.class) + public void testValidateValueMismatchDate() { + ConnectSchema.validateValue(Date.SCHEMA, 1000L); + } + + @Test(expected = DataException.class) + public void testValidateValueMismatchTime() { + ConnectSchema.validateValue(Time.SCHEMA, 1000L); + } + + @Test(expected = DataException.class) + public void testValidateValueMismatchTimestamp() { + ConnectSchema.validateValue(Timestamp.SCHEMA, 1000L); + } + + @Test + public void testPrimitiveEquality() { + // Test that primitive types, which only need to consider all the type & metadata fields, handle equality correctly + ConnectSchema s1 = new ConnectSchema(Schema.Type.INT8, false, null, "name", 2, "doc"); + ConnectSchema s2 = new ConnectSchema(Schema.Type.INT8, false, null, "name", 2, "doc"); + ConnectSchema differentType = new ConnectSchema(Schema.Type.INT16, false, null, "name", 2, "doc"); + ConnectSchema differentOptional = new ConnectSchema(Schema.Type.INT8, true, null, "name", 2, "doc"); + ConnectSchema differentDefault = new ConnectSchema(Schema.Type.INT8, false, true, "name", 2, "doc"); + ConnectSchema differentName = new ConnectSchema(Schema.Type.INT8, false, null, "otherName", 2, "doc"); + ConnectSchema differentVersion = new ConnectSchema(Schema.Type.INT8, false, null, "name", 4, "doc"); + ConnectSchema differentDoc = new ConnectSchema(Schema.Type.INT8, false, null, "name", 2, "other doc"); + ConnectSchema differentParameters = new ConnectSchema(Schema.Type.INT8, false, null, "name", 2, "doc", Collections.singletonMap("param", "value"), null, null, null); + + assertEquals(s1, s2); + assertNotEquals(s1, differentType); + assertNotEquals(s1, differentOptional); + assertNotEquals(s1, differentDefault); + assertNotEquals(s1, differentName); + assertNotEquals(s1, differentVersion); + assertNotEquals(s1, differentDoc); + assertNotEquals(s1, differentParameters); + } + + @Test + public void testArrayEquality() { + // Validate that the value type for the array is tested for equality. This test makes sure the same schema object is + // never reused to ensure we're actually checking equality + ConnectSchema s1 = new ConnectSchema(Schema.Type.ARRAY, false, null, null, null, null, null, null, null, SchemaBuilder.int8().build()); + ConnectSchema s2 = new ConnectSchema(Schema.Type.ARRAY, false, null, null, null, null, null, null, null, SchemaBuilder.int8().build()); + ConnectSchema differentValueSchema = new ConnectSchema(Schema.Type.ARRAY, false, null, null, null, null, null, null, null, SchemaBuilder.int16().build()); + + assertEquals(s1, s2); + assertNotEquals(s1, differentValueSchema); + } + + @Test + public void testMapEquality() { + // Same as testArrayEquality, but for both key and value schemas + ConnectSchema s1 = new ConnectSchema(Schema.Type.MAP, false, null, null, null, null, null, null, SchemaBuilder.int8().build(), SchemaBuilder.int16().build()); + ConnectSchema s2 = new ConnectSchema(Schema.Type.MAP, false, null, null, null, null, null, null, SchemaBuilder.int8().build(), SchemaBuilder.int16().build()); + ConnectSchema differentKeySchema = new ConnectSchema(Schema.Type.MAP, false, null, null, null, null, null, null, SchemaBuilder.string().build(), SchemaBuilder.int16().build()); + ConnectSchema differentValueSchema = new ConnectSchema(Schema.Type.MAP, false, null, null, null, null, null, null, SchemaBuilder.int8().build(), SchemaBuilder.string().build()); + + assertEquals(s1, s2); + assertNotEquals(s1, differentKeySchema); + assertNotEquals(s1, differentValueSchema); + } + + @Test + public void testStructEquality() { + // Same as testArrayEquality, but checks differences in fields. Only does a simple check, relying on tests of + // Field's equals() method to validate all variations in the list of fields will be checked + ConnectSchema s1 = new ConnectSchema(Schema.Type.STRUCT, false, null, null, null, null, null, + Arrays.asList(new Field("field", 0, SchemaBuilder.int8().build()), + new Field("field2", 1, SchemaBuilder.int16().build())), null, null); + ConnectSchema s2 = new ConnectSchema(Schema.Type.STRUCT, false, null, null, null, null, null, + Arrays.asList(new Field("field", 0, SchemaBuilder.int8().build()), + new Field("field2", 1, SchemaBuilder.int16().build())), null, null); + ConnectSchema differentField = new ConnectSchema(Schema.Type.STRUCT, false, null, null, null, null, null, + Arrays.asList(new Field("field", 0, SchemaBuilder.int8().build()), + new Field("different field name", 1, SchemaBuilder.int16().build())), null, null); + + assertEquals(s1, s2); + assertNotEquals(s1, differentField); + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/api/src/test/java/org/apache/kafka/connect/data/DateTest.java ---------------------------------------------------------------------- diff --git a/connect/api/src/test/java/org/apache/kafka/connect/data/DateTest.java b/connect/api/src/test/java/org/apache/kafka/connect/data/DateTest.java new file mode 100644 index 0000000..8d6bd5a --- /dev/null +++ b/connect/api/src/test/java/org/apache/kafka/connect/data/DateTest.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.data; + +import org.apache.kafka.connect.errors.DataException; +import org.junit.Test; + +import java.util.Calendar; +import java.util.GregorianCalendar; +import java.util.TimeZone; + +import static org.junit.Assert.assertEquals; + +public class DateTest { + private static final GregorianCalendar EPOCH; + private static final GregorianCalendar EPOCH_PLUS_TEN_THOUSAND_DAYS; + private static final GregorianCalendar EPOCH_PLUS_TIME_COMPONENT; + static { + EPOCH = new GregorianCalendar(1970, Calendar.JANUARY, 1, 0, 0, 0); + EPOCH.setTimeZone(TimeZone.getTimeZone("UTC")); + + EPOCH_PLUS_TIME_COMPONENT = new GregorianCalendar(1970, Calendar.JANUARY, 1, 0, 0, 1); + EPOCH_PLUS_TIME_COMPONENT.setTimeZone(TimeZone.getTimeZone("UTC")); + + EPOCH_PLUS_TEN_THOUSAND_DAYS = new GregorianCalendar(1970, Calendar.JANUARY, 1, 0, 0, 0); + EPOCH_PLUS_TEN_THOUSAND_DAYS.setTimeZone(TimeZone.getTimeZone("UTC")); + EPOCH_PLUS_TEN_THOUSAND_DAYS.add(Calendar.DATE, 10000); + } + + @Test + public void testBuilder() { + Schema plain = Date.SCHEMA; + assertEquals(Date.LOGICAL_NAME, plain.name()); + assertEquals(1, (Object) plain.version()); + } + + @Test + public void testFromLogical() { + assertEquals(0, Date.fromLogical(Date.SCHEMA, EPOCH.getTime())); + assertEquals(10000, Date.fromLogical(Date.SCHEMA, EPOCH_PLUS_TEN_THOUSAND_DAYS.getTime())); + } + + @Test(expected = DataException.class) + public void testFromLogicalInvalidSchema() { + Date.fromLogical(Date.builder().name("invalid").build(), EPOCH.getTime()); + } + + @Test(expected = DataException.class) + public void testFromLogicalInvalidHasTimeComponents() { + Date.fromLogical(Date.SCHEMA, EPOCH_PLUS_TIME_COMPONENT.getTime()); + } + + @Test + public void testToLogical() { + assertEquals(EPOCH.getTime(), Date.toLogical(Date.SCHEMA, 0)); + assertEquals(EPOCH_PLUS_TEN_THOUSAND_DAYS.getTime(), Date.toLogical(Date.SCHEMA, 10000)); + } + + @Test(expected = DataException.class) + public void testToLogicalInvalidSchema() { + Date.toLogical(Date.builder().name("invalid").build(), 0); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/api/src/test/java/org/apache/kafka/connect/data/DecimalTest.java ---------------------------------------------------------------------- diff --git a/connect/api/src/test/java/org/apache/kafka/connect/data/DecimalTest.java b/connect/api/src/test/java/org/apache/kafka/connect/data/DecimalTest.java new file mode 100644 index 0000000..27f570a --- /dev/null +++ b/connect/api/src/test/java/org/apache/kafka/connect/data/DecimalTest.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.data; + +import org.junit.Test; + +import java.math.BigDecimal; +import java.math.BigInteger; +import java.util.Collections; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +public class DecimalTest { + private static final int TEST_SCALE = 2; + private static final BigDecimal TEST_DECIMAL = new BigDecimal(new BigInteger("156"), TEST_SCALE); + private static final BigDecimal TEST_DECIMAL_NEGATIVE = new BigDecimal(new BigInteger("-156"), TEST_SCALE); + private static final byte[] TEST_BYTES = new byte[]{0, -100}; + private static final byte[] TEST_BYTES_NEGATIVE = new byte[]{-1, 100}; + + @Test + public void testBuilder() { + Schema plain = Decimal.builder(2).build(); + assertEquals(Decimal.LOGICAL_NAME, plain.name()); + assertEquals(Collections.singletonMap(Decimal.SCALE_FIELD, "2"), plain.parameters()); + assertEquals(1, (Object) plain.version()); + } + + @Test + public void testFromLogical() { + Schema schema = Decimal.schema(TEST_SCALE); + byte[] encoded = Decimal.fromLogical(schema, TEST_DECIMAL); + assertArrayEquals(TEST_BYTES, encoded); + + encoded = Decimal.fromLogical(schema, TEST_DECIMAL_NEGATIVE); + assertArrayEquals(TEST_BYTES_NEGATIVE, encoded); + } + + @Test + public void testToLogical() { + Schema schema = Decimal.schema(2); + BigDecimal converted = Decimal.toLogical(schema, TEST_BYTES); + assertEquals(TEST_DECIMAL, converted); + + converted = Decimal.toLogical(schema, TEST_BYTES_NEGATIVE); + assertEquals(TEST_DECIMAL_NEGATIVE, converted); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/api/src/test/java/org/apache/kafka/connect/data/FieldTest.java ---------------------------------------------------------------------- diff --git a/connect/api/src/test/java/org/apache/kafka/connect/data/FieldTest.java b/connect/api/src/test/java/org/apache/kafka/connect/data/FieldTest.java new file mode 100644 index 0000000..e7b3a9d --- /dev/null +++ b/connect/api/src/test/java/org/apache/kafka/connect/data/FieldTest.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.data; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; + +public class FieldTest { + + @Test + public void testEquality() { + Field field1 = new Field("name", 0, Schema.INT8_SCHEMA); + Field field2 = new Field("name", 0, Schema.INT8_SCHEMA); + Field differentName = new Field("name2", 0, Schema.INT8_SCHEMA); + Field differentIndex = new Field("name", 1, Schema.INT8_SCHEMA); + Field differentSchema = new Field("name", 0, Schema.INT16_SCHEMA); + + assertEquals(field1, field2); + assertNotEquals(field1, differentName); + assertNotEquals(field1, differentIndex); + assertNotEquals(field1, differentSchema); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaBuilderTest.java ---------------------------------------------------------------------- diff --git a/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaBuilderTest.java b/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaBuilderTest.java new file mode 100644 index 0000000..62020f3 --- /dev/null +++ b/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaBuilderTest.java @@ -0,0 +1,305 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.data; + +import org.apache.kafka.connect.errors.SchemaBuilderException; +import org.junit.Test; + +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +public class SchemaBuilderTest { + private static final String NAME = "name"; + private static final Integer VERSION = 2; + private static final String DOC = "doc"; + private static final Map<String, String> NO_PARAMS = null; + + @Test + public void testInt8Builder() { + Schema schema = SchemaBuilder.int8().build(); + assertTypeAndDefault(schema, Schema.Type.INT8, false, null); + assertNoMetadata(schema); + + schema = SchemaBuilder.int8().name(NAME).optional().defaultValue((byte) 12) + .version(VERSION).doc(DOC).build(); + assertTypeAndDefault(schema, Schema.Type.INT8, true, (byte) 12); + assertMetadata(schema, NAME, VERSION, DOC, NO_PARAMS); + } + + @Test(expected = SchemaBuilderException.class) + public void testInt8BuilderInvalidDefault() { + SchemaBuilder.int8().defaultValue("invalid"); + } + + @Test + public void testInt16Builder() { + Schema schema = SchemaBuilder.int16().build(); + assertTypeAndDefault(schema, Schema.Type.INT16, false, null); + assertNoMetadata(schema); + + schema = SchemaBuilder.int16().name(NAME).optional().defaultValue((short) 12) + .version(VERSION).doc(DOC).build(); + assertTypeAndDefault(schema, Schema.Type.INT16, true, (short) 12); + assertMetadata(schema, NAME, VERSION, DOC, NO_PARAMS); + } + + @Test(expected = SchemaBuilderException.class) + public void testInt16BuilderInvalidDefault() { + SchemaBuilder.int16().defaultValue("invalid"); + } + + @Test + public void testInt32Builder() { + Schema schema = SchemaBuilder.int32().build(); + assertTypeAndDefault(schema, Schema.Type.INT32, false, null); + assertNoMetadata(schema); + + schema = SchemaBuilder.int32().name(NAME).optional().defaultValue(12) + .version(VERSION).doc(DOC).build(); + assertTypeAndDefault(schema, Schema.Type.INT32, true, 12); + assertMetadata(schema, NAME, VERSION, DOC, NO_PARAMS); + } + + @Test(expected = SchemaBuilderException.class) + public void testInt32BuilderInvalidDefault() { + SchemaBuilder.int32().defaultValue("invalid"); + } + + @Test + public void testInt64Builder() { + Schema schema = SchemaBuilder.int64().build(); + assertTypeAndDefault(schema, Schema.Type.INT64, false, null); + assertNoMetadata(schema); + + schema = SchemaBuilder.int64().name(NAME).optional().defaultValue((long) 12) + .version(VERSION).doc(DOC).build(); + assertTypeAndDefault(schema, Schema.Type.INT64, true, (long) 12); + assertMetadata(schema, NAME, VERSION, DOC, NO_PARAMS); + } + + @Test(expected = SchemaBuilderException.class) + public void testInt64BuilderInvalidDefault() { + SchemaBuilder.int64().defaultValue("invalid"); + } + + @Test + public void testFloatBuilder() { + Schema schema = SchemaBuilder.float32().build(); + assertTypeAndDefault(schema, Schema.Type.FLOAT32, false, null); + assertNoMetadata(schema); + + schema = SchemaBuilder.float32().name(NAME).optional().defaultValue(12.f) + .version(VERSION).doc(DOC).build(); + assertTypeAndDefault(schema, Schema.Type.FLOAT32, true, 12.f); + assertMetadata(schema, NAME, VERSION, DOC, NO_PARAMS); + } + + @Test(expected = SchemaBuilderException.class) + public void testFloatBuilderInvalidDefault() { + SchemaBuilder.float32().defaultValue("invalid"); + } + + @Test + public void testDoubleBuilder() { + Schema schema = SchemaBuilder.float64().build(); + assertTypeAndDefault(schema, Schema.Type.FLOAT64, false, null); + assertNoMetadata(schema); + + schema = SchemaBuilder.float64().name(NAME).optional().defaultValue(12.0) + .version(VERSION).doc(DOC).build(); + assertTypeAndDefault(schema, Schema.Type.FLOAT64, true, 12.0); + assertMetadata(schema, NAME, VERSION, DOC, NO_PARAMS); + } + + @Test(expected = SchemaBuilderException.class) + public void testDoubleBuilderInvalidDefault() { + SchemaBuilder.float64().defaultValue("invalid"); + } + + @Test + public void testBooleanBuilder() { + Schema schema = SchemaBuilder.bool().build(); + assertTypeAndDefault(schema, Schema.Type.BOOLEAN, false, null); + assertNoMetadata(schema); + + schema = SchemaBuilder.bool().name(NAME).optional().defaultValue(true) + .version(VERSION).doc(DOC).build(); + assertTypeAndDefault(schema, Schema.Type.BOOLEAN, true, true); + assertMetadata(schema, NAME, VERSION, DOC, NO_PARAMS); + } + + @Test(expected = SchemaBuilderException.class) + public void testBooleanBuilderInvalidDefault() { + SchemaBuilder.bool().defaultValue("invalid"); + } + + @Test + public void testStringBuilder() { + Schema schema = SchemaBuilder.string().build(); + assertTypeAndDefault(schema, Schema.Type.STRING, false, null); + assertNoMetadata(schema); + + schema = SchemaBuilder.string().name(NAME).optional().defaultValue("a default string") + .version(VERSION).doc(DOC).build(); + assertTypeAndDefault(schema, Schema.Type.STRING, true, "a default string"); + assertMetadata(schema, NAME, VERSION, DOC, NO_PARAMS); + } + + @Test(expected = SchemaBuilderException.class) + public void testStringBuilderInvalidDefault() { + SchemaBuilder.string().defaultValue(true); + } + + @Test + public void testBytesBuilder() { + Schema schema = SchemaBuilder.bytes().build(); + assertTypeAndDefault(schema, Schema.Type.BYTES, false, null); + assertNoMetadata(schema); + + schema = SchemaBuilder.bytes().name(NAME).optional().defaultValue("a default byte array".getBytes()) + .version(VERSION).doc(DOC).build(); + assertTypeAndDefault(schema, Schema.Type.BYTES, true, "a default byte array".getBytes()); + assertMetadata(schema, NAME, VERSION, DOC, NO_PARAMS); + } + + @Test(expected = SchemaBuilderException.class) + public void testBytesBuilderInvalidDefault() { + SchemaBuilder.bytes().defaultValue("a string, not bytes"); + } + + + @Test + public void testParameters() { + Map<String, String> expectedParameters = new HashMap<>(); + expectedParameters.put("foo", "val"); + expectedParameters.put("bar", "baz"); + + Schema schema = SchemaBuilder.string().parameter("foo", "val").parameter("bar", "baz").build(); + assertTypeAndDefault(schema, Schema.Type.STRING, false, null); + assertMetadata(schema, null, null, null, expectedParameters); + + schema = SchemaBuilder.string().parameters(expectedParameters).build(); + assertTypeAndDefault(schema, Schema.Type.STRING, false, null); + assertMetadata(schema, null, null, null, expectedParameters); + } + + + @Test + public void testStructBuilder() { + Schema schema = SchemaBuilder.struct() + .field("field1", Schema.INT8_SCHEMA) + .field("field2", Schema.INT8_SCHEMA) + .build(); + assertTypeAndDefault(schema, Schema.Type.STRUCT, false, null); + assertEquals(2, schema.fields().size()); + assertEquals("field1", schema.fields().get(0).name()); + assertEquals(0, schema.fields().get(0).index()); + assertEquals(Schema.INT8_SCHEMA, schema.fields().get(0).schema()); + assertEquals("field2", schema.fields().get(1).name()); + assertEquals(1, schema.fields().get(1).index()); + assertEquals(Schema.INT8_SCHEMA, schema.fields().get(1).schema()); + assertNoMetadata(schema); + } + + @Test(expected = SchemaBuilderException.class) + public void testNonStructCantHaveFields() { + SchemaBuilder.int8().field("field", SchemaBuilder.int8().build()); + } + + + @Test + public void testArrayBuilder() { + Schema schema = SchemaBuilder.array(Schema.INT8_SCHEMA).build(); + assertTypeAndDefault(schema, Schema.Type.ARRAY, false, null); + assertEquals(schema.valueSchema(), Schema.INT8_SCHEMA); + assertNoMetadata(schema); + + // Default value + List<Byte> defArray = Arrays.asList((byte) 1, (byte) 2); + schema = SchemaBuilder.array(Schema.INT8_SCHEMA).defaultValue(defArray).build(); + assertTypeAndDefault(schema, Schema.Type.ARRAY, false, defArray); + assertEquals(schema.valueSchema(), Schema.INT8_SCHEMA); + assertNoMetadata(schema); + } + + @Test(expected = SchemaBuilderException.class) + public void testArrayBuilderInvalidDefault() { + // Array, but wrong embedded type + SchemaBuilder.array(Schema.INT8_SCHEMA).defaultValue(Arrays.asList("string")).build(); + } + + @Test + public void testMapBuilder() { + Schema schema = SchemaBuilder.map(Schema.INT8_SCHEMA, Schema.INT8_SCHEMA).build(); + assertTypeAndDefault(schema, Schema.Type.MAP, false, null); + assertEquals(schema.keySchema(), Schema.INT8_SCHEMA); + assertEquals(schema.valueSchema(), Schema.INT8_SCHEMA); + assertNoMetadata(schema); + + // Default value + Map<Byte, Byte> defMap = Collections.singletonMap((byte) 5, (byte) 10); + schema = SchemaBuilder.map(Schema.INT8_SCHEMA, Schema.INT8_SCHEMA) + .defaultValue(defMap).build(); + assertTypeAndDefault(schema, Schema.Type.MAP, false, defMap); + assertEquals(schema.keySchema(), Schema.INT8_SCHEMA); + assertEquals(schema.valueSchema(), Schema.INT8_SCHEMA); + assertNoMetadata(schema); + } + + @Test(expected = SchemaBuilderException.class) + public void testMapBuilderInvalidDefault() { + // Map, but wrong embedded type + Map<Byte, String> defMap = Collections.singletonMap((byte) 5, "foo"); + SchemaBuilder.map(Schema.INT8_SCHEMA, Schema.INT8_SCHEMA) + .defaultValue(defMap).build(); + } + + + + private void assertTypeAndDefault(Schema schema, Schema.Type type, boolean optional, Object defaultValue) { + assertEquals(type, schema.type()); + assertEquals(optional, schema.isOptional()); + if (type == Schema.Type.BYTES) { + // byte[] is not comparable, need to wrap to check correctly + if (defaultValue == null) + assertNull(schema.defaultValue()); + else + assertEquals(ByteBuffer.wrap((byte[]) defaultValue), ByteBuffer.wrap((byte[]) schema.defaultValue())); + } else { + assertEquals(defaultValue, schema.defaultValue()); + } + } + + private void assertMetadata(Schema schema, String name, Integer version, String doc, Map<String, String> parameters) { + assertEquals(name, schema.name()); + assertEquals(version, schema.version()); + assertEquals(doc, schema.doc()); + assertEquals(parameters, schema.parameters()); + } + + private void assertNoMetadata(Schema schema) { + assertMetadata(schema, null, null, null, null); + } +}