http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/api/src/main/java/org/apache/kafka/connect/source/SourceRecord.java
----------------------------------------------------------------------
diff --git 
a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceRecord.java 
b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceRecord.java
new file mode 100644
index 0000000..1890062
--- /dev/null
+++ 
b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceRecord.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.source;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.data.Schema;
+
+import java.util.Map;
+
+/**
+ * <p>
+ * SourceRecords are generated by SourceTasks and passed to Kafka Connect for 
storage in
+ * Kafka. In addition to the standard fields in {@link ConnectRecord} which 
specify where data is stored
+ * in Kafka, they also include a sourcePartition and sourceOffset.
+ * </p>
+ * <p>
+ * The sourcePartition represents a single input sourcePartition that the 
record came from (e.g. a filename, table
+ * name, or topic-partition). The sourceOffset represents a position in that 
sourcePartition which can be used
+ * to resume consumption of data.
+ * </p>
+ * <p>
+ * These values can have arbitrary structure and should be represented using
+ * org.apache.kafka.connect.data objects (or primitive values). For example, a 
database connector
+ * might specify the sourcePartition as a record containing { "db": 
"database_name", "table":
+ * "table_name"} and the sourceOffset as a Long containing the timestamp of 
the row.
+ * </p>
+ */
+@InterfaceStability.Unstable
+public class SourceRecord extends ConnectRecord {
+    private final Map<String, ?> sourcePartition;
+    private final Map<String, ?> sourceOffset;
+
+    public SourceRecord(Map<String, ?> sourcePartition, Map<String, ?> 
sourceOffset,
+                        String topic, Integer partition, Schema valueSchema, 
Object value) {
+        this(sourcePartition, sourceOffset, topic, partition, null, null, 
valueSchema, value);
+    }
+
+    public SourceRecord(Map<String, ?> sourcePartition, Map<String, ?> 
sourceOffset,
+                        String topic, Schema valueSchema, Object value) {
+        this(sourcePartition, sourceOffset, topic, null, null, null, 
valueSchema, value);
+    }
+
+    public SourceRecord(Map<String, ?> sourcePartition, Map<String, ?> 
sourceOffset,
+                        String topic, Integer partition,
+                        Schema keySchema, Object key, Schema valueSchema, 
Object value) {
+        super(topic, partition, keySchema, key, valueSchema, value);
+        this.sourcePartition = sourcePartition;
+        this.sourceOffset = sourceOffset;
+    }
+
+    public Map<String, ?> sourcePartition() {
+        return sourcePartition;
+    }
+
+    public Map<String, ?> sourceOffset() {
+        return sourceOffset;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o)
+            return true;
+        if (o == null || getClass() != o.getClass())
+            return false;
+        if (!super.equals(o))
+            return false;
+
+        SourceRecord that = (SourceRecord) o;
+
+        if (sourcePartition != null ? 
!sourcePartition.equals(that.sourcePartition) : that.sourcePartition != null)
+            return false;
+        if (sourceOffset != null ? !sourceOffset.equals(that.sourceOffset) : 
that.sourceOffset != null)
+            return false;
+
+        return true;
+    }
+
+    @Override
+    public int hashCode() {
+        int result = super.hashCode();
+        result = 31 * result + (sourcePartition != null ? 
sourcePartition.hashCode() : 0);
+        result = 31 * result + (sourceOffset != null ? sourceOffset.hashCode() 
: 0);
+        return result;
+    }
+
+    @Override
+    public String toString() {
+        return "SourceRecord{" +
+                "sourcePartition=" + sourcePartition +
+                ", sourceOffset=" + sourceOffset +
+                "} " + super.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java
----------------------------------------------------------------------
diff --git 
a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java 
b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java
new file mode 100644
index 0000000..5110504
--- /dev/null
+++ b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.source;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.connect.connector.Task;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * SourceTask is a Task that pulls records from another system for storage in 
Kafka.
+ */
+@InterfaceStability.Unstable
+public abstract class SourceTask implements Task {
+
+    protected SourceTaskContext context;
+
+    /**
+     * Initialize this SourceTask with the specified context object.
+     */
+    public void initialize(SourceTaskContext context) {
+        this.context = context;
+    }
+
+    /**
+     * Start the Task. This should handle any configuration parsing and 
one-time setup of the task.
+     * @param props initial configuration
+     */
+    @Override
+    public abstract void start(Map<String, String> props);
+
+    /**
+     * Poll this SourceTask for new records. This method should block if no 
data is currently
+     * available.
+     *
+     * @return a list of source records
+     */
+    public abstract List<SourceRecord> poll() throws InterruptedException;
+
+    /**
+     * <p>
+     * Commit the offsets, up to the offsets that have been returned by {@link 
#poll()}. This
+     * method should block until the commit is complete.
+     * </p>
+     * <p>
+     * SourceTasks are not required to implement this functionality; Kafka 
Connect will record offsets
+     * automatically. This hook is provided for systems that also need to 
store offsets internally
+     * in their own system.
+     * </p>
+     */
+    public void commit() throws InterruptedException {
+        // This space intentionally left blank.
+    }
+
+    /**
+     * Signal this SourceTask to stop. In SourceTasks, this method only needs 
to signal to the task that it should stop
+     * trying to poll for new data and interrupt any outstanding poll() 
requests. It is not required that the task has
+     * fully stopped. Note that this method necessarily may be invoked from a 
different thread than {@link #poll()} and
+     * {@link #commit()}.
+     *
+     * For example, if a task uses a {@link java.nio.channels.Selector} to 
receive data over the network, this method
+     * could set a flag that will force {@link #poll()} to exit immediately 
and invoke
+     * {@link java.nio.channels.Selector#wakeup()} to interrupt any ongoing 
requests.
+     */
+    public abstract void stop();
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTaskContext.java
----------------------------------------------------------------------
diff --git 
a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTaskContext.java
 
b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTaskContext.java
new file mode 100644
index 0000000..200fa5f
--- /dev/null
+++ 
b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTaskContext.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.connect.source;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.connect.storage.OffsetStorageReader;
+
+/**
+ * SourceTaskContext is provided to SourceTasks to allow them to interact with 
the underlying
+ * runtime.
+ */
+@InterfaceStability.Unstable
+public interface SourceTaskContext {
+    /**
+     * Get the OffsetStorageReader for this SourceTask.
+     */
+    OffsetStorageReader offsetStorageReader();
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/api/src/main/java/org/apache/kafka/connect/storage/Converter.java
----------------------------------------------------------------------
diff --git 
a/connect/api/src/main/java/org/apache/kafka/connect/storage/Converter.java 
b/connect/api/src/main/java/org/apache/kafka/connect/storage/Converter.java
new file mode 100644
index 0000000..158ddb1
--- /dev/null
+++ b/connect/api/src/main/java/org/apache/kafka/connect/storage/Converter.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.storage;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaAndValue;
+
+import java.util.Map;
+
+/**
+ * The Converter interface provides support for translating between Kafka 
Connect's runtime data format
+ * and byte[]. Internally, this likely includes an intermediate step to the 
format used by the serialization
+ * layer (e.g. JsonNode, GenericRecord, Message).
+ */
+@InterfaceStability.Unstable
+public interface Converter {
+
+    /**
+     * Configure this class.
+     * @param configs configs in key/value pairs
+     * @param isKey whether is for key or value
+     */
+    void configure(Map<String, ?> configs, boolean isKey);
+
+    /**
+     * Convert a Kafka Connect data object to a native object for 
serialization.
+     * @param topic the topic associated with the data
+     * @param schema the schema for the value
+     * @param value the value to convert
+     * @return
+     */
+    byte[] fromConnectData(String topic, Schema schema, Object value);
+
+    /**
+     * Convert a native object to a Kafka Connect data object.
+     * @param topic the topic associated with the data
+     * @param value the value to convert
+     * @return an object containing the {@link Schema} and the converted value
+     */
+    SchemaAndValue toConnectData(String topic, byte[] value);
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/api/src/main/java/org/apache/kafka/connect/storage/OffsetStorageReader.java
----------------------------------------------------------------------
diff --git 
a/connect/api/src/main/java/org/apache/kafka/connect/storage/OffsetStorageReader.java
 
b/connect/api/src/main/java/org/apache/kafka/connect/storage/OffsetStorageReader.java
new file mode 100644
index 0000000..9307c23
--- /dev/null
+++ 
b/connect/api/src/main/java/org/apache/kafka/connect/storage/OffsetStorageReader.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.storage;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Collection;
+import java.util.Map;
+
+/**
+ * <p>
+ * OffsetStorageReader provides access to the offset storage used by sources. 
This can be used by
+ * connectors to determine offsets to start consuming data from. This is most 
commonly used during
+ * initialization of a task, but can also be used during runtime, e.g. when 
reconfiguring a task.
+ * </p>
+ * <p>
+ * Offsets are always defined as Maps of Strings to primitive types, i.e. all 
types supported by
+ * {@link org.apache.kafka.connect.data.Schema} other than Array, Map, and 
Struct.
+ * </p>
+ */
+@InterfaceStability.Unstable
+public interface OffsetStorageReader {
+    /**
+     * Get the offset for the specified partition. If the data isn't already 
available locally, this
+     * gets it from the backing store, which may require some network round 
trips.
+     *
+     * @param partition object uniquely identifying the partition of data
+     * @return object uniquely identifying the offset in the partition of data
+     */
+    <T> Map<String, Object> offset(Map<String, T> partition);
+
+    /**
+     * <p>
+     * Get a set of offsets for the specified partition identifiers. This may 
be more efficient
+     * than calling {@link #offset(Map)} repeatedly.
+     * </p>
+     * <p>
+     * Note that when errors occur, this method omits the associated data and 
tries to return as
+     * many of the requested values as possible. This allows a task that's 
managing many partitions to
+     * still proceed with any available data. Therefore, implementations 
should take care to check
+     * that the data is actually available in the returned response. The only 
case when an
+     * exception will be thrown is if the entire request failed, e.g. because 
the underlying
+     * storage was unavailable.
+     * </p>
+     *
+     * @param partitions set of identifiers for partitions of data
+     * @return a map of partition identifiers to decoded offsets
+     */
+    <T> Map<Map<String, T>, Map<String, Object>> 
offsets(Collection<Map<String, T>> partitions);
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/api/src/main/java/org/apache/kafka/connect/storage/StringConverter.java
----------------------------------------------------------------------
diff --git 
a/connect/api/src/main/java/org/apache/kafka/connect/storage/StringConverter.java
 
b/connect/api/src/main/java/org/apache/kafka/connect/storage/StringConverter.java
new file mode 100644
index 0000000..5859f18
--- /dev/null
+++ 
b/connect/api/src/main/java/org/apache/kafka/connect/storage/StringConverter.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.storage;
+
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaAndValue;
+import org.apache.kafka.connect.errors.DataException;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * {@link Converter} implementation that only supports serializing to strings. 
When converting Kafka Connect data to bytes,
+ * the schema will be ignored and {@link Object#toString()} will always be 
invoked to convert the data to a String.
+ * When converting from bytes to Kafka Connect format, the converter will only 
ever return an optional string schema and
+ * a string or null.
+ *
+ * Encoding configuration is identical to {@link StringSerializer} and {@link 
StringDeserializer}, but for convenience
+ * this class can also be configured to use the same encoding for both 
encoding and decoding with the converter.encoding
+ * setting.
+ */
+public class StringConverter implements Converter {
+    private final StringSerializer serializer = new StringSerializer();
+    private final StringDeserializer deserializer = new StringDeserializer();
+
+    public StringConverter() {
+    }
+
+    @Override
+    public void configure(Map<String, ?> configs, boolean isKey) {
+        Map<String, Object> serializerConfigs = new HashMap<>();
+        serializerConfigs.putAll(configs);
+        Map<String, Object> deserializerConfigs = new HashMap<>();
+        deserializerConfigs.putAll(configs);
+
+        Object encodingValue = configs.get("converter.encoding");
+        if (encodingValue != null) {
+            serializerConfigs.put("serializer.encoding", encodingValue);
+            deserializerConfigs.put("deserializer.encoding", encodingValue);
+        }
+
+        serializer.configure(serializerConfigs, isKey);
+        deserializer.configure(deserializerConfigs, isKey);
+    }
+
+    @Override
+    public byte[] fromConnectData(String topic, Schema schema, Object value) {
+        try {
+            return serializer.serialize(topic, value == null ? null : 
value.toString());
+        } catch (SerializationException e) {
+            throw new DataException("Failed to serialize to a string: ", e);
+        }
+    }
+
+    @Override
+    public SchemaAndValue toConnectData(String topic, byte[] value) {
+        try {
+            return new SchemaAndValue(Schema.OPTIONAL_STRING_SCHEMA, 
deserializer.deserialize(topic, value));
+        } catch (SerializationException e) {
+            throw new DataException("Failed to deserialize string: ", e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/api/src/main/java/org/apache/kafka/connect/util/ConnectorUtils.java
----------------------------------------------------------------------
diff --git 
a/connect/api/src/main/java/org/apache/kafka/connect/util/ConnectorUtils.java 
b/connect/api/src/main/java/org/apache/kafka/connect/util/ConnectorUtils.java
new file mode 100644
index 0000000..35250eb
--- /dev/null
+++ 
b/connect/api/src/main/java/org/apache/kafka/connect/util/ConnectorUtils.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.util;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Utilities that connector implementations might find useful. Contains common 
building blocks
+ * for writing connectors.
+ */
+@InterfaceStability.Unstable
+public class ConnectorUtils {
+    /**
+     * Given a list of elements and a target number of groups, generates list 
of groups of
+     * elements to match the target number of groups, spreading them evenly 
among the groups.
+     * This generates groups with contiguous elements, which results in 
intuitive ordering if
+     * your elements are also ordered (e.g. alphabetical lists of table names 
if you sort
+     * table names alphabetically to generate the raw partitions) or can 
result in efficient
+     * partitioning if elements are sorted according to some criteria that 
affects performance
+     * (e.g. topic partitions with the same leader).
+     *
+     * @param elements list of elements to partition
+     * @param numGroups the number of output groups to generate.
+     */
+    public static <T> List<List<T>> groupPartitions(List<T> elements, int 
numGroups) {
+        if (numGroups <= 0)
+            throw new IllegalArgumentException("Number of groups must be 
positive.");
+
+        List<List<T>> result = new ArrayList<>(numGroups);
+
+        // Each group has either n+1 or n raw partitions
+        int perGroup = elements.size() / numGroups;
+        int leftover = elements.size() - (numGroups * perGroup);
+
+        int assigned = 0;
+        for (int group = 0; group < numGroups; group++) {
+            int numThisGroup = group < leftover ? perGroup + 1 : perGroup;
+            List<T> groupList = new ArrayList<>(numThisGroup);
+            for (int i = 0; i < numThisGroup; i++) {
+                groupList.add(elements.get(assigned));
+                assigned++;
+            }
+            result.add(groupList);
+        }
+
+        return result;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/api/src/test/java/org/apache/kafka/connect/connector/ConnectorReconfigurationTest.java
----------------------------------------------------------------------
diff --git 
a/connect/api/src/test/java/org/apache/kafka/connect/connector/ConnectorReconfigurationTest.java
 
b/connect/api/src/test/java/org/apache/kafka/connect/connector/ConnectorReconfigurationTest.java
new file mode 100644
index 0000000..7ea1de2
--- /dev/null
+++ 
b/connect/api/src/test/java/org/apache/kafka/connect/connector/ConnectorReconfigurationTest.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.connector;
+
+import org.apache.kafka.connect.errors.ConnectException;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+public class ConnectorReconfigurationTest {
+
+    @Test
+    public void testDefaultReconfigure() throws Exception {
+        TestConnector conn = new TestConnector(false);
+        conn.reconfigure(Collections.<String, String>emptyMap());
+        assertEquals(conn.stopOrder, 0);
+        assertEquals(conn.configureOrder, 1);
+    }
+
+    @Test(expected = ConnectException.class)
+    public void testReconfigureStopException() throws Exception {
+        TestConnector conn = new TestConnector(true);
+        conn.reconfigure(Collections.<String, String>emptyMap());
+    }
+
+    private static class TestConnector extends Connector {
+        private boolean stopException;
+        private int order = 0;
+        public int stopOrder = -1;
+        public int configureOrder = -1;
+
+        public TestConnector(boolean stopException) {
+            this.stopException = stopException;
+        }
+
+        @Override
+        public String version() {
+            return "1.0";
+        }
+
+        @Override
+        public void start(Map<String, String> props) {
+            configureOrder = order++;
+        }
+
+        @Override
+        public Class<? extends Task> taskClass() {
+            return null;
+        }
+
+        @Override
+        public List<Map<String, String>> taskConfigs(int count) {
+            return null;
+        }
+
+        @Override
+        public void stop() {
+            stopOrder = order++;
+            if (stopException)
+                throw new ConnectException("error");
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/api/src/test/java/org/apache/kafka/connect/data/ConnectSchemaTest.java
----------------------------------------------------------------------
diff --git 
a/connect/api/src/test/java/org/apache/kafka/connect/data/ConnectSchemaTest.java
 
b/connect/api/src/test/java/org/apache/kafka/connect/data/ConnectSchemaTest.java
new file mode 100644
index 0000000..4388ade
--- /dev/null
+++ 
b/connect/api/src/test/java/org/apache/kafka/connect/data/ConnectSchemaTest.java
@@ -0,0 +1,303 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.data;
+
+import org.apache.kafka.connect.errors.DataException;
+import org.junit.Test;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNull;
+
+public class ConnectSchemaTest {
+    private static final Schema MAP_INT_STRING_SCHEMA = 
SchemaBuilder.map(Schema.INT32_SCHEMA, Schema.STRING_SCHEMA).build();
+    private static final Schema FLAT_STRUCT_SCHEMA = SchemaBuilder.struct()
+            .field("field", Schema.INT32_SCHEMA)
+            .build();
+    private static final Schema STRUCT_SCHEMA = SchemaBuilder.struct()
+            .field("first", Schema.INT32_SCHEMA)
+            .field("second", Schema.STRING_SCHEMA)
+            .field("array", SchemaBuilder.array(Schema.INT32_SCHEMA).build())
+            .field("map", SchemaBuilder.map(Schema.INT32_SCHEMA, 
Schema.STRING_SCHEMA).build())
+            .field("nested", FLAT_STRUCT_SCHEMA)
+            .build();
+    private static final Schema PARENT_STRUCT_SCHEMA = SchemaBuilder.struct()
+            .field("nested", FLAT_STRUCT_SCHEMA)
+            .build();
+
+    @Test
+    public void testFieldsOnStructSchema() {
+        Schema schema = SchemaBuilder.struct()
+                .field("foo", Schema.BOOLEAN_SCHEMA)
+                .field("bar", Schema.INT32_SCHEMA)
+                .build();
+
+        assertEquals(2, schema.fields().size());
+        // Validate field lookup by name
+        Field foo = schema.field("foo");
+        assertEquals(0, foo.index());
+        Field bar = schema.field("bar");
+        assertEquals(1, bar.index());
+        // Any other field name should fail
+        assertNull(schema.field("other"));
+    }
+
+
+    @Test(expected = DataException.class)
+    public void testFieldsOnlyValidForStructs() {
+        Schema.INT8_SCHEMA.fields();
+    }
+
+    @Test
+    public void testValidateValueMatchingType() {
+        ConnectSchema.validateValue(Schema.INT8_SCHEMA, (byte) 1);
+        ConnectSchema.validateValue(Schema.INT16_SCHEMA, (short) 1);
+        ConnectSchema.validateValue(Schema.INT32_SCHEMA, 1);
+        ConnectSchema.validateValue(Schema.INT64_SCHEMA, (long) 1);
+        ConnectSchema.validateValue(Schema.FLOAT32_SCHEMA, 1.f);
+        ConnectSchema.validateValue(Schema.FLOAT64_SCHEMA, 1.);
+        ConnectSchema.validateValue(Schema.BOOLEAN_SCHEMA, true);
+        ConnectSchema.validateValue(Schema.STRING_SCHEMA, "a string");
+        ConnectSchema.validateValue(Schema.BYTES_SCHEMA, "a byte 
array".getBytes());
+        ConnectSchema.validateValue(Schema.BYTES_SCHEMA, ByteBuffer.wrap("a 
byte array".getBytes()));
+        
ConnectSchema.validateValue(SchemaBuilder.array(Schema.INT32_SCHEMA).build(), 
Arrays.asList(1, 2, 3));
+        ConnectSchema.validateValue(
+                SchemaBuilder.map(Schema.INT32_SCHEMA, 
Schema.STRING_SCHEMA).build(),
+                Collections.singletonMap(1, "value")
+        );
+        // Struct tests the basic struct layout + complex field types + nested 
structs
+        Struct structValue = new Struct(STRUCT_SCHEMA)
+                .put("first", 1)
+                .put("second", "foo")
+                .put("array", Arrays.asList(1, 2, 3))
+                .put("map", Collections.singletonMap(1, "value"))
+                .put("nested", new Struct(FLAT_STRUCT_SCHEMA).put("field", 
12));
+        ConnectSchema.validateValue(STRUCT_SCHEMA, structValue);
+    }
+
+    @Test
+    public void testValidateValueMatchingLogicalType() {
+        ConnectSchema.validateValue(Decimal.schema(2), new BigDecimal(new 
BigInteger("156"), 2));
+        ConnectSchema.validateValue(Date.SCHEMA, new java.util.Date(0));
+        ConnectSchema.validateValue(Time.SCHEMA, new java.util.Date(0));
+        ConnectSchema.validateValue(Timestamp.SCHEMA, new java.util.Date(0));
+    }
+
+    // To avoid requiring excessive numbers of tests, these checks for invalid 
types use a similar type where possible
+    // to only include a single test for each type
+
+    @Test(expected = DataException.class)
+    public void testValidateValueMismatchInt8() {
+        ConnectSchema.validateValue(Schema.INT8_SCHEMA, 1);
+    }
+
+    @Test(expected = DataException.class)
+    public void testValidateValueMismatchInt16() {
+        ConnectSchema.validateValue(Schema.INT16_SCHEMA, 1);
+    }
+
+    @Test(expected = DataException.class)
+    public void testValidateValueMismatchInt32() {
+        ConnectSchema.validateValue(Schema.INT32_SCHEMA, (long) 1);
+    }
+
+    @Test(expected = DataException.class)
+    public void testValidateValueMismatchInt64() {
+        ConnectSchema.validateValue(Schema.INT64_SCHEMA, 1);
+    }
+
+    @Test(expected = DataException.class)
+    public void testValidateValueMismatchFloat() {
+        ConnectSchema.validateValue(Schema.FLOAT32_SCHEMA, 1.0);
+    }
+
+    @Test(expected = DataException.class)
+    public void testValidateValueMismatchDouble() {
+        ConnectSchema.validateValue(Schema.FLOAT64_SCHEMA, 1.f);
+    }
+
+    @Test(expected = DataException.class)
+    public void testValidateValueMismatchBoolean() {
+        ConnectSchema.validateValue(Schema.BOOLEAN_SCHEMA, 1.f);
+    }
+
+    @Test(expected = DataException.class)
+    public void testValidateValueMismatchString() {
+        // CharSequence is a similar type (supertype of String), but we 
restrict to String.
+        CharBuffer cbuf = CharBuffer.wrap("abc");
+        ConnectSchema.validateValue(Schema.STRING_SCHEMA, cbuf);
+    }
+
+    @Test(expected = DataException.class)
+    public void testValidateValueMismatchBytes() {
+        ConnectSchema.validateValue(Schema.BYTES_SCHEMA, new Object[]{1, 
"foo"});
+    }
+
+    @Test(expected = DataException.class)
+    public void testValidateValueMismatchArray() {
+        
ConnectSchema.validateValue(SchemaBuilder.array(Schema.INT32_SCHEMA).build(), 
Arrays.asList("a", "b", "c"));
+    }
+
+    @Test(expected = DataException.class)
+    public void testValidateValueMismatchArraySomeMatch() {
+        // Even if some match the right type, this should fail if any 
mismatch. In this case, type erasure loses
+        // the fact that the list is actually List<Object>, but we couldn't 
tell if only checking the first element
+        
ConnectSchema.validateValue(SchemaBuilder.array(Schema.INT32_SCHEMA).build(), 
Arrays.asList(1, 2, "c"));
+    }
+
+    @Test(expected = DataException.class)
+    public void testValidateValueMismatchMapKey() {
+        ConnectSchema.validateValue(MAP_INT_STRING_SCHEMA, 
Collections.singletonMap("wrong key type", "value"));
+    }
+
+    @Test(expected = DataException.class)
+    public void testValidateValueMismatchMapValue() {
+        ConnectSchema.validateValue(MAP_INT_STRING_SCHEMA, 
Collections.singletonMap(1, 2));
+    }
+
+    @Test(expected = DataException.class)
+    public void testValidateValueMismatchMapSomeKeys() {
+        Map<Object, String> data = new HashMap<>();
+        data.put(1, "abc");
+        data.put("wrong", "it's as easy as one two three");
+        ConnectSchema.validateValue(MAP_INT_STRING_SCHEMA, data);
+    }
+
+    @Test(expected = DataException.class)
+    public void testValidateValueMismatchMapSomeValues() {
+        Map<Integer, Object> data = new HashMap<>();
+        data.put(1, "abc");
+        data.put(2, "wrong".getBytes());
+        ConnectSchema.validateValue(MAP_INT_STRING_SCHEMA, data);
+    }
+
+    @Test(expected = DataException.class)
+    public void testValidateValueMismatchStructWrongSchema() {
+        // Completely mismatching schemas
+        ConnectSchema.validateValue(
+                FLAT_STRUCT_SCHEMA,
+                new Struct(SchemaBuilder.struct().field("x", 
Schema.INT32_SCHEMA).build()).put("x", 1)
+        );
+    }
+
+    @Test(expected = DataException.class)
+    public void testValidateValueMismatchStructWrongNestedSchema() {
+        // Top-level schema  matches, but nested does not.
+        ConnectSchema.validateValue(
+                PARENT_STRUCT_SCHEMA,
+                new Struct(PARENT_STRUCT_SCHEMA)
+                        .put("nested", new 
Struct(SchemaBuilder.struct().field("x", Schema.INT32_SCHEMA).build()).put("x", 
1))
+        );
+    }
+
+    @Test(expected = DataException.class)
+    public void testValidateValueMismatchDecimal() {
+        ConnectSchema.validateValue(Decimal.schema(2), new BigInteger("156"));
+    }
+
+    @Test(expected = DataException.class)
+    public void testValidateValueMismatchDate() {
+        ConnectSchema.validateValue(Date.SCHEMA, 1000L);
+    }
+
+    @Test(expected = DataException.class)
+    public void testValidateValueMismatchTime() {
+        ConnectSchema.validateValue(Time.SCHEMA, 1000L);
+    }
+
+    @Test(expected = DataException.class)
+    public void testValidateValueMismatchTimestamp() {
+        ConnectSchema.validateValue(Timestamp.SCHEMA, 1000L);
+    }
+
+    @Test
+    public void testPrimitiveEquality() {
+        // Test that primitive types, which only need to consider all the type 
& metadata fields, handle equality correctly
+        ConnectSchema s1 = new ConnectSchema(Schema.Type.INT8, false, null, 
"name", 2, "doc");
+        ConnectSchema s2 = new ConnectSchema(Schema.Type.INT8, false, null, 
"name", 2, "doc");
+        ConnectSchema differentType = new ConnectSchema(Schema.Type.INT16, 
false, null, "name", 2, "doc");
+        ConnectSchema differentOptional = new ConnectSchema(Schema.Type.INT8, 
true, null, "name", 2, "doc");
+        ConnectSchema differentDefault = new ConnectSchema(Schema.Type.INT8, 
false, true, "name", 2, "doc");
+        ConnectSchema differentName = new ConnectSchema(Schema.Type.INT8, 
false, null, "otherName", 2, "doc");
+        ConnectSchema differentVersion = new ConnectSchema(Schema.Type.INT8, 
false, null, "name", 4, "doc");
+        ConnectSchema differentDoc = new ConnectSchema(Schema.Type.INT8, 
false, null, "name", 2, "other doc");
+        ConnectSchema differentParameters = new 
ConnectSchema(Schema.Type.INT8, false, null, "name", 2, "doc", 
Collections.singletonMap("param", "value"), null, null, null);
+
+        assertEquals(s1, s2);
+        assertNotEquals(s1, differentType);
+        assertNotEquals(s1, differentOptional);
+        assertNotEquals(s1, differentDefault);
+        assertNotEquals(s1, differentName);
+        assertNotEquals(s1, differentVersion);
+        assertNotEquals(s1, differentDoc);
+        assertNotEquals(s1, differentParameters);
+    }
+
+    @Test
+    public void testArrayEquality() {
+        // Validate that the value type for the array is tested for equality. 
This test makes sure the same schema object is
+        // never reused to ensure we're actually checking equality
+        ConnectSchema s1 = new ConnectSchema(Schema.Type.ARRAY, false, null, 
null, null, null, null, null, null, SchemaBuilder.int8().build());
+        ConnectSchema s2 = new ConnectSchema(Schema.Type.ARRAY, false, null, 
null, null, null, null, null, null, SchemaBuilder.int8().build());
+        ConnectSchema differentValueSchema = new 
ConnectSchema(Schema.Type.ARRAY, false, null, null, null, null, null, null, 
null, SchemaBuilder.int16().build());
+
+        assertEquals(s1, s2);
+        assertNotEquals(s1, differentValueSchema);
+    }
+
+    @Test
+    public void testMapEquality() {
+        // Same as testArrayEquality, but for both key and value schemas
+        ConnectSchema s1 = new ConnectSchema(Schema.Type.MAP, false, null, 
null, null, null, null, null, SchemaBuilder.int8().build(), 
SchemaBuilder.int16().build());
+        ConnectSchema s2 = new ConnectSchema(Schema.Type.MAP, false, null, 
null, null, null, null, null, SchemaBuilder.int8().build(), 
SchemaBuilder.int16().build());
+        ConnectSchema differentKeySchema = new ConnectSchema(Schema.Type.MAP, 
false, null, null, null, null, null, null, SchemaBuilder.string().build(), 
SchemaBuilder.int16().build());
+        ConnectSchema differentValueSchema = new 
ConnectSchema(Schema.Type.MAP, false, null, null, null, null, null, null, 
SchemaBuilder.int8().build(), SchemaBuilder.string().build());
+
+        assertEquals(s1, s2);
+        assertNotEquals(s1, differentKeySchema);
+        assertNotEquals(s1, differentValueSchema);
+    }
+
+    @Test
+    public void testStructEquality() {
+        // Same as testArrayEquality, but checks differences in fields. Only 
does a simple check, relying on tests of
+        // Field's equals() method to validate all variations in the list of 
fields will be checked
+        ConnectSchema s1 = new ConnectSchema(Schema.Type.STRUCT, false, null, 
null, null, null, null,
+                Arrays.asList(new Field("field", 0, 
SchemaBuilder.int8().build()),
+                        new Field("field2", 1, 
SchemaBuilder.int16().build())), null, null);
+        ConnectSchema s2 = new ConnectSchema(Schema.Type.STRUCT, false, null, 
null, null, null, null,
+                Arrays.asList(new Field("field", 0, 
SchemaBuilder.int8().build()),
+                        new Field("field2", 1, 
SchemaBuilder.int16().build())), null, null);
+        ConnectSchema differentField = new ConnectSchema(Schema.Type.STRUCT, 
false, null, null, null, null, null,
+                Arrays.asList(new Field("field", 0, 
SchemaBuilder.int8().build()),
+                        new Field("different field name", 1, 
SchemaBuilder.int16().build())), null, null);
+
+        assertEquals(s1, s2);
+        assertNotEquals(s1, differentField);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/api/src/test/java/org/apache/kafka/connect/data/DateTest.java
----------------------------------------------------------------------
diff --git 
a/connect/api/src/test/java/org/apache/kafka/connect/data/DateTest.java 
b/connect/api/src/test/java/org/apache/kafka/connect/data/DateTest.java
new file mode 100644
index 0000000..8d6bd5a
--- /dev/null
+++ b/connect/api/src/test/java/org/apache/kafka/connect/data/DateTest.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.data;
+
+import org.apache.kafka.connect.errors.DataException;
+import org.junit.Test;
+
+import java.util.Calendar;
+import java.util.GregorianCalendar;
+import java.util.TimeZone;
+
+import static org.junit.Assert.assertEquals;
+
+public class DateTest {
+    private static final GregorianCalendar EPOCH;
+    private static final GregorianCalendar EPOCH_PLUS_TEN_THOUSAND_DAYS;
+    private static final GregorianCalendar EPOCH_PLUS_TIME_COMPONENT;
+    static {
+        EPOCH = new GregorianCalendar(1970, Calendar.JANUARY, 1, 0, 0, 0);
+        EPOCH.setTimeZone(TimeZone.getTimeZone("UTC"));
+
+        EPOCH_PLUS_TIME_COMPONENT = new GregorianCalendar(1970, 
Calendar.JANUARY, 1, 0, 0, 1);
+        EPOCH_PLUS_TIME_COMPONENT.setTimeZone(TimeZone.getTimeZone("UTC"));
+
+        EPOCH_PLUS_TEN_THOUSAND_DAYS = new GregorianCalendar(1970, 
Calendar.JANUARY, 1, 0, 0, 0);
+        EPOCH_PLUS_TEN_THOUSAND_DAYS.setTimeZone(TimeZone.getTimeZone("UTC"));
+        EPOCH_PLUS_TEN_THOUSAND_DAYS.add(Calendar.DATE, 10000);
+    }
+
+    @Test
+    public void testBuilder() {
+        Schema plain = Date.SCHEMA;
+        assertEquals(Date.LOGICAL_NAME, plain.name());
+        assertEquals(1, (Object) plain.version());
+    }
+
+    @Test
+    public void testFromLogical() {
+        assertEquals(0, Date.fromLogical(Date.SCHEMA, EPOCH.getTime()));
+        assertEquals(10000, Date.fromLogical(Date.SCHEMA, 
EPOCH_PLUS_TEN_THOUSAND_DAYS.getTime()));
+    }
+
+    @Test(expected = DataException.class)
+    public void testFromLogicalInvalidSchema() {
+        Date.fromLogical(Date.builder().name("invalid").build(), 
EPOCH.getTime());
+    }
+
+    @Test(expected = DataException.class)
+    public void testFromLogicalInvalidHasTimeComponents() {
+        Date.fromLogical(Date.SCHEMA, EPOCH_PLUS_TIME_COMPONENT.getTime());
+    }
+
+    @Test
+    public void testToLogical() {
+        assertEquals(EPOCH.getTime(), Date.toLogical(Date.SCHEMA, 0));
+        assertEquals(EPOCH_PLUS_TEN_THOUSAND_DAYS.getTime(), 
Date.toLogical(Date.SCHEMA, 10000));
+    }
+
+    @Test(expected = DataException.class)
+    public void testToLogicalInvalidSchema() {
+        Date.toLogical(Date.builder().name("invalid").build(), 0);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/api/src/test/java/org/apache/kafka/connect/data/DecimalTest.java
----------------------------------------------------------------------
diff --git 
a/connect/api/src/test/java/org/apache/kafka/connect/data/DecimalTest.java 
b/connect/api/src/test/java/org/apache/kafka/connect/data/DecimalTest.java
new file mode 100644
index 0000000..27f570a
--- /dev/null
+++ b/connect/api/src/test/java/org/apache/kafka/connect/data/DecimalTest.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.data;
+
+import org.junit.Test;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.Collections;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+public class DecimalTest {
+    private static final int TEST_SCALE = 2;
+    private static final BigDecimal TEST_DECIMAL = new BigDecimal(new 
BigInteger("156"), TEST_SCALE);
+    private static final BigDecimal TEST_DECIMAL_NEGATIVE = new BigDecimal(new 
BigInteger("-156"), TEST_SCALE);
+    private static final byte[] TEST_BYTES = new byte[]{0, -100};
+    private static final byte[] TEST_BYTES_NEGATIVE = new byte[]{-1, 100};
+
+    @Test
+    public void testBuilder() {
+        Schema plain = Decimal.builder(2).build();
+        assertEquals(Decimal.LOGICAL_NAME, plain.name());
+        assertEquals(Collections.singletonMap(Decimal.SCALE_FIELD, "2"), 
plain.parameters());
+        assertEquals(1, (Object) plain.version());
+    }
+
+    @Test
+    public void testFromLogical() {
+        Schema schema = Decimal.schema(TEST_SCALE);
+        byte[] encoded = Decimal.fromLogical(schema, TEST_DECIMAL);
+        assertArrayEquals(TEST_BYTES, encoded);
+
+        encoded = Decimal.fromLogical(schema, TEST_DECIMAL_NEGATIVE);
+        assertArrayEquals(TEST_BYTES_NEGATIVE, encoded);
+    }
+
+    @Test
+    public void testToLogical() {
+        Schema schema = Decimal.schema(2);
+        BigDecimal converted = Decimal.toLogical(schema, TEST_BYTES);
+        assertEquals(TEST_DECIMAL, converted);
+
+        converted = Decimal.toLogical(schema, TEST_BYTES_NEGATIVE);
+        assertEquals(TEST_DECIMAL_NEGATIVE, converted);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/api/src/test/java/org/apache/kafka/connect/data/FieldTest.java
----------------------------------------------------------------------
diff --git 
a/connect/api/src/test/java/org/apache/kafka/connect/data/FieldTest.java 
b/connect/api/src/test/java/org/apache/kafka/connect/data/FieldTest.java
new file mode 100644
index 0000000..e7b3a9d
--- /dev/null
+++ b/connect/api/src/test/java/org/apache/kafka/connect/data/FieldTest.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.data;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+public class FieldTest {
+
+    @Test
+    public void testEquality() {
+        Field field1 = new Field("name", 0, Schema.INT8_SCHEMA);
+        Field field2 = new Field("name", 0, Schema.INT8_SCHEMA);
+        Field differentName = new Field("name2", 0, Schema.INT8_SCHEMA);
+        Field differentIndex = new Field("name", 1, Schema.INT8_SCHEMA);
+        Field differentSchema = new Field("name", 0, Schema.INT16_SCHEMA);
+
+        assertEquals(field1, field2);
+        assertNotEquals(field1, differentName);
+        assertNotEquals(field1, differentIndex);
+        assertNotEquals(field1, differentSchema);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaBuilderTest.java
----------------------------------------------------------------------
diff --git 
a/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaBuilderTest.java
 
b/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaBuilderTest.java
new file mode 100644
index 0000000..62020f3
--- /dev/null
+++ 
b/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaBuilderTest.java
@@ -0,0 +1,305 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.data;
+
+import org.apache.kafka.connect.errors.SchemaBuilderException;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class SchemaBuilderTest {
+    private static final String NAME = "name";
+    private static final Integer VERSION = 2;
+    private static final String DOC = "doc";
+    private static final Map<String, String> NO_PARAMS = null;
+
+    @Test
+    public void testInt8Builder() {
+        Schema schema = SchemaBuilder.int8().build();
+        assertTypeAndDefault(schema, Schema.Type.INT8, false, null);
+        assertNoMetadata(schema);
+
+        schema = 
SchemaBuilder.int8().name(NAME).optional().defaultValue((byte) 12)
+                .version(VERSION).doc(DOC).build();
+        assertTypeAndDefault(schema, Schema.Type.INT8, true, (byte) 12);
+        assertMetadata(schema, NAME, VERSION, DOC, NO_PARAMS);
+    }
+
+    @Test(expected = SchemaBuilderException.class)
+    public void testInt8BuilderInvalidDefault() {
+        SchemaBuilder.int8().defaultValue("invalid");
+    }
+
+    @Test
+    public void testInt16Builder() {
+        Schema schema = SchemaBuilder.int16().build();
+        assertTypeAndDefault(schema, Schema.Type.INT16, false, null);
+        assertNoMetadata(schema);
+
+        schema = 
SchemaBuilder.int16().name(NAME).optional().defaultValue((short) 12)
+                .version(VERSION).doc(DOC).build();
+        assertTypeAndDefault(schema, Schema.Type.INT16, true, (short) 12);
+        assertMetadata(schema, NAME, VERSION, DOC, NO_PARAMS);
+    }
+
+    @Test(expected = SchemaBuilderException.class)
+    public void testInt16BuilderInvalidDefault() {
+        SchemaBuilder.int16().defaultValue("invalid");
+    }
+
+    @Test
+    public void testInt32Builder() {
+        Schema schema = SchemaBuilder.int32().build();
+        assertTypeAndDefault(schema, Schema.Type.INT32, false, null);
+        assertNoMetadata(schema);
+
+        schema = SchemaBuilder.int32().name(NAME).optional().defaultValue(12)
+                .version(VERSION).doc(DOC).build();
+        assertTypeAndDefault(schema, Schema.Type.INT32, true, 12);
+        assertMetadata(schema, NAME, VERSION, DOC, NO_PARAMS);
+    }
+
+    @Test(expected = SchemaBuilderException.class)
+    public void testInt32BuilderInvalidDefault() {
+        SchemaBuilder.int32().defaultValue("invalid");
+    }
+
+    @Test
+    public void testInt64Builder() {
+        Schema schema = SchemaBuilder.int64().build();
+        assertTypeAndDefault(schema, Schema.Type.INT64, false, null);
+        assertNoMetadata(schema);
+
+        schema = 
SchemaBuilder.int64().name(NAME).optional().defaultValue((long) 12)
+                .version(VERSION).doc(DOC).build();
+        assertTypeAndDefault(schema, Schema.Type.INT64, true, (long) 12);
+        assertMetadata(schema, NAME, VERSION, DOC, NO_PARAMS);
+    }
+
+    @Test(expected = SchemaBuilderException.class)
+    public void testInt64BuilderInvalidDefault() {
+        SchemaBuilder.int64().defaultValue("invalid");
+    }
+
+    @Test
+    public void testFloatBuilder() {
+        Schema schema = SchemaBuilder.float32().build();
+        assertTypeAndDefault(schema, Schema.Type.FLOAT32, false, null);
+        assertNoMetadata(schema);
+
+        schema = 
SchemaBuilder.float32().name(NAME).optional().defaultValue(12.f)
+                .version(VERSION).doc(DOC).build();
+        assertTypeAndDefault(schema, Schema.Type.FLOAT32, true, 12.f);
+        assertMetadata(schema, NAME, VERSION, DOC, NO_PARAMS);
+    }
+
+    @Test(expected = SchemaBuilderException.class)
+    public void testFloatBuilderInvalidDefault() {
+        SchemaBuilder.float32().defaultValue("invalid");
+    }
+
+    @Test
+    public void testDoubleBuilder() {
+        Schema schema = SchemaBuilder.float64().build();
+        assertTypeAndDefault(schema, Schema.Type.FLOAT64, false, null);
+        assertNoMetadata(schema);
+
+        schema = 
SchemaBuilder.float64().name(NAME).optional().defaultValue(12.0)
+                .version(VERSION).doc(DOC).build();
+        assertTypeAndDefault(schema, Schema.Type.FLOAT64, true, 12.0);
+        assertMetadata(schema, NAME, VERSION, DOC, NO_PARAMS);
+    }
+
+    @Test(expected = SchemaBuilderException.class)
+    public void testDoubleBuilderInvalidDefault() {
+        SchemaBuilder.float64().defaultValue("invalid");
+    }
+
+    @Test
+    public void testBooleanBuilder() {
+        Schema schema = SchemaBuilder.bool().build();
+        assertTypeAndDefault(schema, Schema.Type.BOOLEAN, false, null);
+        assertNoMetadata(schema);
+
+        schema = SchemaBuilder.bool().name(NAME).optional().defaultValue(true)
+                .version(VERSION).doc(DOC).build();
+        assertTypeAndDefault(schema, Schema.Type.BOOLEAN, true, true);
+        assertMetadata(schema, NAME, VERSION, DOC, NO_PARAMS);
+    }
+
+    @Test(expected = SchemaBuilderException.class)
+    public void testBooleanBuilderInvalidDefault() {
+        SchemaBuilder.bool().defaultValue("invalid");
+    }
+
+    @Test
+    public void testStringBuilder() {
+        Schema schema = SchemaBuilder.string().build();
+        assertTypeAndDefault(schema, Schema.Type.STRING, false, null);
+        assertNoMetadata(schema);
+
+        schema = SchemaBuilder.string().name(NAME).optional().defaultValue("a 
default string")
+                .version(VERSION).doc(DOC).build();
+        assertTypeAndDefault(schema, Schema.Type.STRING, true, "a default 
string");
+        assertMetadata(schema, NAME, VERSION, DOC, NO_PARAMS);
+    }
+
+    @Test(expected = SchemaBuilderException.class)
+    public void testStringBuilderInvalidDefault() {
+        SchemaBuilder.string().defaultValue(true);
+    }
+
+    @Test
+    public void testBytesBuilder() {
+        Schema schema = SchemaBuilder.bytes().build();
+        assertTypeAndDefault(schema, Schema.Type.BYTES, false, null);
+        assertNoMetadata(schema);
+
+        schema = SchemaBuilder.bytes().name(NAME).optional().defaultValue("a 
default byte array".getBytes())
+                .version(VERSION).doc(DOC).build();
+        assertTypeAndDefault(schema, Schema.Type.BYTES, true, "a default byte 
array".getBytes());
+        assertMetadata(schema, NAME, VERSION, DOC, NO_PARAMS);
+    }
+
+    @Test(expected = SchemaBuilderException.class)
+    public void testBytesBuilderInvalidDefault() {
+        SchemaBuilder.bytes().defaultValue("a string, not bytes");
+    }
+
+
+    @Test
+    public void testParameters() {
+        Map<String, String> expectedParameters = new HashMap<>();
+        expectedParameters.put("foo", "val");
+        expectedParameters.put("bar", "baz");
+
+        Schema schema = SchemaBuilder.string().parameter("foo", 
"val").parameter("bar", "baz").build();
+        assertTypeAndDefault(schema, Schema.Type.STRING, false, null);
+        assertMetadata(schema, null, null, null, expectedParameters);
+
+        schema = SchemaBuilder.string().parameters(expectedParameters).build();
+        assertTypeAndDefault(schema, Schema.Type.STRING, false, null);
+        assertMetadata(schema, null, null, null, expectedParameters);
+    }
+
+
+    @Test
+    public void testStructBuilder() {
+        Schema schema = SchemaBuilder.struct()
+                .field("field1", Schema.INT8_SCHEMA)
+                .field("field2", Schema.INT8_SCHEMA)
+                .build();
+        assertTypeAndDefault(schema, Schema.Type.STRUCT, false, null);
+        assertEquals(2, schema.fields().size());
+        assertEquals("field1", schema.fields().get(0).name());
+        assertEquals(0, schema.fields().get(0).index());
+        assertEquals(Schema.INT8_SCHEMA, schema.fields().get(0).schema());
+        assertEquals("field2", schema.fields().get(1).name());
+        assertEquals(1, schema.fields().get(1).index());
+        assertEquals(Schema.INT8_SCHEMA, schema.fields().get(1).schema());
+        assertNoMetadata(schema);
+    }
+
+    @Test(expected = SchemaBuilderException.class)
+    public void testNonStructCantHaveFields() {
+        SchemaBuilder.int8().field("field", SchemaBuilder.int8().build());
+    }
+
+
+    @Test
+    public void testArrayBuilder() {
+        Schema schema = SchemaBuilder.array(Schema.INT8_SCHEMA).build();
+        assertTypeAndDefault(schema, Schema.Type.ARRAY, false, null);
+        assertEquals(schema.valueSchema(), Schema.INT8_SCHEMA);
+        assertNoMetadata(schema);
+
+        // Default value
+        List<Byte> defArray = Arrays.asList((byte) 1, (byte) 2);
+        schema = 
SchemaBuilder.array(Schema.INT8_SCHEMA).defaultValue(defArray).build();
+        assertTypeAndDefault(schema, Schema.Type.ARRAY, false, defArray);
+        assertEquals(schema.valueSchema(), Schema.INT8_SCHEMA);
+        assertNoMetadata(schema);
+    }
+
+    @Test(expected = SchemaBuilderException.class)
+    public void testArrayBuilderInvalidDefault() {
+        // Array, but wrong embedded type
+        
SchemaBuilder.array(Schema.INT8_SCHEMA).defaultValue(Arrays.asList("string")).build();
+    }
+
+    @Test
+    public void testMapBuilder() {
+        Schema schema = SchemaBuilder.map(Schema.INT8_SCHEMA, 
Schema.INT8_SCHEMA).build();
+        assertTypeAndDefault(schema, Schema.Type.MAP, false, null);
+        assertEquals(schema.keySchema(), Schema.INT8_SCHEMA);
+        assertEquals(schema.valueSchema(), Schema.INT8_SCHEMA);
+        assertNoMetadata(schema);
+
+        // Default value
+        Map<Byte, Byte> defMap = Collections.singletonMap((byte) 5, (byte) 10);
+        schema = SchemaBuilder.map(Schema.INT8_SCHEMA, Schema.INT8_SCHEMA)
+                .defaultValue(defMap).build();
+        assertTypeAndDefault(schema, Schema.Type.MAP, false, defMap);
+        assertEquals(schema.keySchema(), Schema.INT8_SCHEMA);
+        assertEquals(schema.valueSchema(), Schema.INT8_SCHEMA);
+        assertNoMetadata(schema);
+    }
+
+    @Test(expected = SchemaBuilderException.class)
+    public void testMapBuilderInvalidDefault() {
+        // Map, but wrong embedded type
+        Map<Byte, String> defMap = Collections.singletonMap((byte) 5, "foo");
+        SchemaBuilder.map(Schema.INT8_SCHEMA, Schema.INT8_SCHEMA)
+                .defaultValue(defMap).build();
+    }
+
+
+
+    private void assertTypeAndDefault(Schema schema, Schema.Type type, boolean 
optional, Object defaultValue) {
+        assertEquals(type, schema.type());
+        assertEquals(optional, schema.isOptional());
+        if (type == Schema.Type.BYTES) {
+            // byte[] is not comparable, need to wrap to check correctly
+            if (defaultValue == null)
+                assertNull(schema.defaultValue());
+            else
+                assertEquals(ByteBuffer.wrap((byte[]) defaultValue), 
ByteBuffer.wrap((byte[]) schema.defaultValue()));
+        } else {
+            assertEquals(defaultValue, schema.defaultValue());
+        }
+    }
+
+    private void assertMetadata(Schema schema, String name, Integer version, 
String doc, Map<String, String> parameters) {
+        assertEquals(name, schema.name());
+        assertEquals(version, schema.version());
+        assertEquals(doc, schema.doc());
+        assertEquals(parameters, schema.parameters());
+    }
+
+    private void assertNoMetadata(Schema schema) {
+        assertMetadata(schema, null, null, null, null);
+    }
+}

Reply via email to