http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTask.java
----------------------------------------------------------------------
diff --git 
a/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTask.java 
b/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTask.java
deleted file mode 100644
index 90651ed..0000000
--- a/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTask.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-package org.apache.kafka.copycat.sink;
-
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.annotation.InterfaceStability;
-import org.apache.kafka.copycat.connector.Task;
-
-import java.util.Collection;
-import java.util.Map;
-
-/**
- * SinkTask is a Task takes records loaded from Kafka and sends them to 
another system. In
- * addition to the basic {@link #put} interface, SinkTasks must also implement 
{@link #flush}
- * to support offset commits.
- */
-@InterfaceStability.Unstable
-public abstract class SinkTask implements Task {
-
-    /**
-     * <p>
-     * The configuration key that provides the list of topics that are inputs 
for this
-     * SinkTask.
-     * </p>
-     */
-    public static final String TOPICS_CONFIG = "topics";
-
-    protected SinkTaskContext context;
-
-    public void initialize(SinkTaskContext context) {
-        this.context = context;
-    }
-
-    /**
-     * Start the Task. This should handle any configuration parsing and 
one-time setup of the task.
-     * @param props initial configuration
-     */
-    @Override
-    public abstract void start(Map<String, String> props);
-
-    /**
-     * Put the records in the sink. Usually this should send the records to 
the sink asynchronously
-     * and immediately return.
-     *
-     * If this operation fails, the SinkTask may throw a {@link 
org.apache.kafka.copycat.errors.RetriableException} to
-     * indicate that the framework should attempt to retry the same call 
again. Other exceptions will cause the task to
-     * be stopped immediately. {@link SinkTaskContext#timeout(long)} can be 
used to set the maximum time before the
-     * batch will be retried.
-     *
-     * @param records the set of records to send
-     */
-    public abstract void put(Collection<SinkRecord> records);
-
-    /**
-     * Flush all records that have been {@link #put} for the specified 
topic-partitions. The
-     * offsets are provided for convenience, but could also be determined by 
tracking all offsets
-     * included in the SinkRecords passed to {@link #put}.
-     *
-     * @param offsets mapping of TopicPartition to committed offset
-     */
-    public abstract void flush(Map<TopicPartition, OffsetAndMetadata> offsets);
-
-    /**
-     * The SinkTask use this method to create writers for newly assigned 
partitions in case of partition
-     * re-assignment. In partition re-assignment, some new partitions may be 
assigned to the SinkTask.
-     * The SinkTask needs to create writers and perform necessary recovery for 
the newly assigned partitions.
-     * This method will be called after partition re-assignment completes and 
before the SinkTask starts
-     * fetching data.
-     * @param partitions The list of partitions that are now assigned to the 
task (may include
-     *                   partitions previously assigned to the task)
-     */
-    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
-    }
-
-    /**
-     * The SinkTask use this method to close writers and commit offsets for 
partitions that are
-     * longer assigned to the SinkTask. This method will be called before a 
rebalance operation starts
-     * and after the SinkTask stops fetching data.
-     * @param partitions The list of partitions that were assigned to the 
consumer on the last
-     *                   rebalance
-     */
-    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
-    }
-
-    /**
-     * Perform any cleanup to stop this task. In SinkTasks, this method is 
invoked only once outstanding calls to other
-     * methods have completed (e.g., {@link #put(Collection)} has returned) 
and a final {@link #flush(Map)} and offset
-     * commit has completed. Implementations of this method should only need 
to perform final cleanup operations, such
-     * as closing network connections to the sink system.
-     */
-    public abstract void stop();
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTaskContext.java
----------------------------------------------------------------------
diff --git 
a/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTaskContext.java 
b/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTaskContext.java
deleted file mode 100644
index 763b9a4..0000000
--- 
a/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTaskContext.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.sink;
-
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.annotation.InterfaceStability;
-
-import java.util.Map;
-import java.util.Set;
-
-/**
- * Context passed to SinkTasks, allowing them to access utilities in the 
copycat runtime.
- */
-@InterfaceStability.Unstable
-public interface SinkTaskContext {
-    /**
-     * Reset the consumer offsets for the given topic partitions. SinkTasks 
should use this if they manage offsets
-     * in the sink data store rather than using Kafka consumer offsets. For 
example, an HDFS connector might record
-     * offsets in HDFS to provide exactly once delivery. When the SinkTask is 
started or a rebalance occurs, the task
-     * would reload offsets from HDFS and use this method to reset the 
consumer to those offsets.
-     *
-     * SinkTasks that do not manage their own offsets do not need to use this 
method.
-     *
-     * @param offsets map of offsets for topic partitions
-     */
-    void offset(Map<TopicPartition, Long> offsets);
-
-    /**
-     * Reset the consumer offsets for the given topic partition. SinkTasks 
should use if they manage offsets
-     * in the sink data store rather than using Kafka consumer offsets. For 
example, an HDFS connector might record
-     * offsets in HDFS to provide exactly once delivery. When the topic 
partition is recovered the task
-     * would reload offsets from HDFS and use this method to reset the 
consumer to the offset.
-     *
-     * SinkTasks that do not manage their own offsets do not need to use this 
method.
-     *
-     * @param tp the topic partition to reset offset.
-     * @param offset the offset to reset to.
-     */
-    void offset(TopicPartition tp, long offset);
-
-    /**
-     * Set the timeout in milliseconds. SinkTasks should use this to indicate 
that they need to retry certain
-     * operations after the timeout. SinkTasks may have certain operations on 
external systems that may need
-     * to retry in case of failures. For example, append a record to an HDFS 
file may fail due to temporary network
-     * issues. SinkTasks use this method to set how long to wait before 
retrying.
-     * @param timeoutMs the backoff timeout in milliseconds.
-     */
-    void timeout(long timeoutMs);
-
-    /**
-     * Get the current set of assigned TopicPartitions for this task.
-     * @return the set of currently assigned TopicPartitions
-     */
-    Set<TopicPartition> assignment();
-
-    /**
-     * Pause consumption of messages from the specified TopicPartitions.
-     * @param partitions the partitions which should be paused
-     */
-    void pause(TopicPartition... partitions);
-
-    /**
-     * Resume consumption of messages from previously paused TopicPartitions.
-     * @param partitions the partitions to resume
-     */
-    void resume(TopicPartition... partitions);
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceConnector.java
----------------------------------------------------------------------
diff --git 
a/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceConnector.java
 
b/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceConnector.java
deleted file mode 100644
index 7258cdf..0000000
--- 
a/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceConnector.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-package org.apache.kafka.copycat.source;
-
-import org.apache.kafka.common.annotation.InterfaceStability;
-import org.apache.kafka.copycat.connector.Connector;
-
-/**
- * SourceConnectors implement the connector interface to pull data from 
another system and send
- * it to Kafka.
- */
-@InterfaceStability.Unstable
-public abstract class SourceConnector extends Connector {
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceRecord.java
----------------------------------------------------------------------
diff --git 
a/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceRecord.java 
b/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceRecord.java
deleted file mode 100644
index 7f54c10..0000000
--- 
a/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceRecord.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.source;
-
-import org.apache.kafka.common.annotation.InterfaceStability;
-import org.apache.kafka.copycat.connector.CopycatRecord;
-import org.apache.kafka.copycat.data.Schema;
-
-import java.util.Map;
-
-/**
- * <p>
- * SourceRecords are generated by SourceTasks and passed to Copycat for 
storage in
- * Kafka. In addition to the standard fields in CopycatRecord which specify 
where data is stored
- * in Kafka, they also include a sourcePartition and sourceOffset.
- * </p>
- * <p>
- * The sourcePartition represents a single input sourcePartition that the 
record came from (e.g. a filename, table
- * name, or topic-partition). The sourceOffset represents a position in that 
sourcePartition which can be used
- * to resume consumption of data.
- * </p>
- * <p>
- * These values can have arbitrary structure and should be represented using
- * org.apache.kafka.copycat.data objects (or primitive values). For example, a 
database connector
- * might specify the sourcePartition as a record containing { "db": 
"database_name", "table":
- * "table_name"} and the sourceOffset as a Long containing the timestamp of 
the row.
- * </p>
- */
-@InterfaceStability.Unstable
-public class SourceRecord extends CopycatRecord {
-    private final Map<String, ?> sourcePartition;
-    private final Map<String, ?> sourceOffset;
-
-    public SourceRecord(Map<String, ?> sourcePartition, Map<String, ?> 
sourceOffset,
-                        String topic, Integer partition, Schema valueSchema, 
Object value) {
-        this(sourcePartition, sourceOffset, topic, partition, null, null, 
valueSchema, value);
-    }
-
-    public SourceRecord(Map<String, ?> sourcePartition, Map<String, ?> 
sourceOffset,
-                        String topic, Schema valueSchema, Object value) {
-        this(sourcePartition, sourceOffset, topic, null, null, null, 
valueSchema, value);
-    }
-
-    public SourceRecord(Map<String, ?> sourcePartition, Map<String, ?> 
sourceOffset,
-                        String topic, Integer partition,
-                        Schema keySchema, Object key, Schema valueSchema, 
Object value) {
-        super(topic, partition, keySchema, key, valueSchema, value);
-        this.sourcePartition = sourcePartition;
-        this.sourceOffset = sourceOffset;
-    }
-
-    public Map<String, ?> sourcePartition() {
-        return sourcePartition;
-    }
-
-    public Map<String, ?> sourceOffset() {
-        return sourceOffset;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o)
-            return true;
-        if (o == null || getClass() != o.getClass())
-            return false;
-        if (!super.equals(o))
-            return false;
-
-        SourceRecord that = (SourceRecord) o;
-
-        if (sourcePartition != null ? 
!sourcePartition.equals(that.sourcePartition) : that.sourcePartition != null)
-            return false;
-        if (sourceOffset != null ? !sourceOffset.equals(that.sourceOffset) : 
that.sourceOffset != null)
-            return false;
-
-        return true;
-    }
-
-    @Override
-    public int hashCode() {
-        int result = super.hashCode();
-        result = 31 * result + (sourcePartition != null ? 
sourcePartition.hashCode() : 0);
-        result = 31 * result + (sourceOffset != null ? sourceOffset.hashCode() 
: 0);
-        return result;
-    }
-
-    @Override
-    public String toString() {
-        return "SourceRecord{" +
-                "sourcePartition=" + sourcePartition +
-                ", sourceOffset=" + sourceOffset +
-                "} " + super.toString();
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceTask.java
----------------------------------------------------------------------
diff --git 
a/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceTask.java 
b/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceTask.java
deleted file mode 100644
index 841943f..0000000
--- a/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceTask.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.source;
-
-import org.apache.kafka.common.annotation.InterfaceStability;
-import org.apache.kafka.copycat.connector.Task;
-
-import java.util.List;
-import java.util.Map;
-
-/**
- * SourceTask is a Task that pulls records from another system for storage in 
Kafka.
- */
-@InterfaceStability.Unstable
-public abstract class SourceTask implements Task {
-
-    protected SourceTaskContext context;
-
-    /**
-     * Initialize this SourceTask with the specified context object.
-     */
-    public void initialize(SourceTaskContext context) {
-        this.context = context;
-    }
-
-    /**
-     * Start the Task. This should handle any configuration parsing and 
one-time setup of the task.
-     * @param props initial configuration
-     */
-    @Override
-    public abstract void start(Map<String, String> props);
-
-    /**
-     * Poll this SourceTask for new records. This method should block if no 
data is currently
-     * available.
-     *
-     * @return a list of source records
-     */
-    public abstract List<SourceRecord> poll() throws InterruptedException;
-
-    /**
-     * <p>
-     * Commit the offsets, up to the offsets that have been returned by {@link 
#poll()}. This
-     * method should block until the commit is complete.
-     * </p>
-     * <p>
-     * SourceTasks are not required to implement this functionality; Copycat 
will record offsets
-     * automatically. This hook is provided for systems that also need to 
store offsets internally
-     * in their own system.
-     * </p>
-     */
-    public void commit() throws InterruptedException {
-        // This space intentionally left blank.
-    }
-
-    /**
-     * Signal this SourceTask to stop. In SourceTasks, this method only needs 
to signal to the task that it should stop
-     * trying to poll for new data and interrupt any outstanding poll() 
requests. It is not required that the task has
-     * fully stopped. Note that this method necessarily may be invoked from a 
different thread than {@link #poll()} and
-     * {@link #commit()}.
-     *
-     * For example, if a task uses a {@link java.nio.channels.Selector} to 
receive data over the network, this method
-     * could set a flag that will force {@link #poll()} to exit immediately 
and invoke
-     * {@link java.nio.channels.Selector#wakeup()} to interrupt any ongoing 
requests.
-     */
-    public abstract void stop();
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceTaskContext.java
----------------------------------------------------------------------
diff --git 
a/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceTaskContext.java
 
b/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceTaskContext.java
deleted file mode 100644
index bc18c30..0000000
--- 
a/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceTaskContext.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-package org.apache.kafka.copycat.source;
-
-import org.apache.kafka.common.annotation.InterfaceStability;
-import org.apache.kafka.copycat.storage.OffsetStorageReader;
-
-/**
- * SourceTaskContext is provided to SourceTasks to allow them to interact with 
the underlying
- * runtime.
- */
-@InterfaceStability.Unstable
-public interface SourceTaskContext {
-    /**
-     * Get the OffsetStorageReader for this SourceTask.
-     */
-    OffsetStorageReader offsetStorageReader();
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/api/src/main/java/org/apache/kafka/copycat/storage/Converter.java
----------------------------------------------------------------------
diff --git 
a/copycat/api/src/main/java/org/apache/kafka/copycat/storage/Converter.java 
b/copycat/api/src/main/java/org/apache/kafka/copycat/storage/Converter.java
deleted file mode 100644
index d51b789..0000000
--- a/copycat/api/src/main/java/org/apache/kafka/copycat/storage/Converter.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.storage;
-
-import org.apache.kafka.common.annotation.InterfaceStability;
-import org.apache.kafka.copycat.data.Schema;
-import org.apache.kafka.copycat.data.SchemaAndValue;
-
-import java.util.Map;
-
-/**
- * The Converter interface provides support for translating between Copycat's 
runtime data format
- * and byte[]. Internally, this likely includes an intermediate step to the 
format used by the serialization
- * layer (e.g. JsonNode, GenericRecord, Message).
- */
-@InterfaceStability.Unstable
-public interface Converter {
-
-    /**
-     * Configure this class.
-     * @param configs configs in key/value pairs
-     * @param isKey whether is for key or value
-     */
-    void configure(Map<String, ?> configs, boolean isKey);
-
-    /**
-     * Convert a Copycat data object to a native object for serialization.
-     * @param topic the topic associated with the data
-     * @param schema the schema for the value
-     * @param value the value to convert
-     * @return
-     */
-    byte[] fromCopycatData(String topic, Schema schema, Object value);
-
-    /**
-     * Convert a native object to a Copycat data object.
-     * @param topic the topic associated with the data
-     * @param value the value to convert
-     * @return an object containing the {@link Schema} and the converted value
-     */
-    SchemaAndValue toCopycatData(String topic, byte[] value);
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/api/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReader.java
----------------------------------------------------------------------
diff --git 
a/copycat/api/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReader.java
 
b/copycat/api/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReader.java
deleted file mode 100644
index 95d2c04..0000000
--- 
a/copycat/api/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReader.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.storage;
-
-import org.apache.kafka.common.annotation.InterfaceStability;
-
-import java.util.Collection;
-import java.util.Map;
-
-/**
- * <p>
- * OffsetStorageReader provides access to the offset storage used by sources. 
This can be used by
- * connectors to determine offsets to start consuming data from. This is most 
commonly used during
- * initialization of a task, but can also be used during runtime, e.g. when 
reconfiguring a task.
- * </p>
- * <p>
- * Offsets are always defined as Maps of Strings to primitive types, i.e. all 
types supported by
- * {@link org.apache.kafka.copycat.data.Schema} other than Array, Map, and 
Struct.
- * </p>
- */
-@InterfaceStability.Unstable
-public interface OffsetStorageReader {
-    /**
-     * Get the offset for the specified partition. If the data isn't already 
available locally, this
-     * gets it from the backing store, which may require some network round 
trips.
-     *
-     * @param partition object uniquely identifying the partition of data
-     * @return object uniquely identifying the offset in the partition of data
-     */
-    <T> Map<String, Object> offset(Map<String, T> partition);
-
-    /**
-     * <p>
-     * Get a set of offsets for the specified partition identifiers. This may 
be more efficient
-     * than calling {@link #offset(Map)} repeatedly.
-     * </p>
-     * <p>
-     * Note that when errors occur, this method omits the associated data and 
tries to return as
-     * many of the requested values as possible. This allows a task that's 
managing many partitions to
-     * still proceed with any available data. Therefore, implementations 
should take care to check
-     * that the data is actually available in the returned response. The only 
case when an
-     * exception will be thrown is if the entire request failed, e.g. because 
the underlying
-     * storage was unavailable.
-     * </p>
-     *
-     * @param partitions set of identifiers for partitions of data
-     * @return a map of partition identifiers to decoded offsets
-     */
-    <T> Map<Map<String, T>, Map<String, Object>> 
offsets(Collection<Map<String, T>> partitions);
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/api/src/main/java/org/apache/kafka/copycat/storage/StringConverter.java
----------------------------------------------------------------------
diff --git 
a/copycat/api/src/main/java/org/apache/kafka/copycat/storage/StringConverter.java
 
b/copycat/api/src/main/java/org/apache/kafka/copycat/storage/StringConverter.java
deleted file mode 100644
index 8d708f8..0000000
--- 
a/copycat/api/src/main/java/org/apache/kafka/copycat/storage/StringConverter.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.storage;
-
-import org.apache.kafka.common.errors.SerializationException;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.copycat.data.Schema;
-import org.apache.kafka.copycat.data.SchemaAndValue;
-import org.apache.kafka.copycat.errors.DataException;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * {@link Converter} implementation that only supports serializing to strings. 
When converting Copycat data to bytes,
- * the schema will be ignored and {@link Object#toString()} will always be 
invoked to convert the data to a String.
- * When converting from bytes to Copycat format, the converter will only ever 
return an optional string schema and
- * a string or null.
- *
- * Encoding configuration is identical to {@link StringSerializer} and {@link 
StringDeserializer}, but for convenience
- * this class can also be configured to use the same encoding for both 
encoding and decoding with the converter.encoding
- * setting.
- */
-public class StringConverter implements Converter {
-    private final StringSerializer serializer = new StringSerializer();
-    private final StringDeserializer deserializer = new StringDeserializer();
-
-    public StringConverter() {
-    }
-
-    @Override
-    public void configure(Map<String, ?> configs, boolean isKey) {
-        Map<String, Object> serializerConfigs = new HashMap<>();
-        serializerConfigs.putAll(configs);
-        Map<String, Object> deserializerConfigs = new HashMap<>();
-        deserializerConfigs.putAll(configs);
-
-        Object encodingValue = configs.get("converter.encoding");
-        if (encodingValue != null) {
-            serializerConfigs.put("serializer.encoding", encodingValue);
-            deserializerConfigs.put("deserializer.encoding", encodingValue);
-        }
-
-        serializer.configure(serializerConfigs, isKey);
-        deserializer.configure(deserializerConfigs, isKey);
-    }
-
-    @Override
-    public byte[] fromCopycatData(String topic, Schema schema, Object value) {
-        try {
-            return serializer.serialize(topic, value == null ? null : 
value.toString());
-        } catch (SerializationException e) {
-            throw new DataException("Failed to serialize to a string: ", e);
-        }
-    }
-
-    @Override
-    public SchemaAndValue toCopycatData(String topic, byte[] value) {
-        try {
-            return new SchemaAndValue(Schema.OPTIONAL_STRING_SCHEMA, 
deserializer.deserialize(topic, value));
-        } catch (SerializationException e) {
-            throw new DataException("Failed to deserialize string: ", e);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/api/src/main/java/org/apache/kafka/copycat/util/ConnectorUtils.java
----------------------------------------------------------------------
diff --git 
a/copycat/api/src/main/java/org/apache/kafka/copycat/util/ConnectorUtils.java 
b/copycat/api/src/main/java/org/apache/kafka/copycat/util/ConnectorUtils.java
deleted file mode 100644
index f9dd53a..0000000
--- 
a/copycat/api/src/main/java/org/apache/kafka/copycat/util/ConnectorUtils.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.util;
-
-import org.apache.kafka.common.annotation.InterfaceStability;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * Utilities that connector implementations might find useful. Contains common 
building blocks
- * for writing connectors.
- */
-@InterfaceStability.Unstable
-public class ConnectorUtils {
-    /**
-     * Given a list of elements and a target number of groups, generates list 
of groups of
-     * elements to match the target number of groups, spreading them evenly 
among the groups.
-     * This generates groups with contiguous elements, which results in 
intuitive ordering if
-     * your elements are also ordered (e.g. alphabetical lists of table names 
if you sort
-     * table names alphabetically to generate the raw partitions) or can 
result in efficient
-     * partitioning if elements are sorted according to some criteria that 
affects performance
-     * (e.g. topic partitions with the same leader).
-     *
-     * @param elements list of elements to partition
-     * @param numGroups the number of output groups to generate.
-     */
-    public static <T> List<List<T>> groupPartitions(List<T> elements, int 
numGroups) {
-        if (numGroups <= 0)
-            throw new IllegalArgumentException("Number of groups must be 
positive.");
-
-        List<List<T>> result = new ArrayList<>(numGroups);
-
-        // Each group has either n+1 or n raw partitions
-        int perGroup = elements.size() / numGroups;
-        int leftover = elements.size() - (numGroups * perGroup);
-
-        int assigned = 0;
-        for (int group = 0; group < numGroups; group++) {
-            int numThisGroup = group < leftover ? perGroup + 1 : perGroup;
-            List<T> groupList = new ArrayList<>(numThisGroup);
-            for (int i = 0; i < numThisGroup; i++) {
-                groupList.add(elements.get(assigned));
-                assigned++;
-            }
-            result.add(groupList);
-        }
-
-        return result;
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/api/src/test/java/org/apache/kafka/copycat/connector/ConnectorReconfigurationTest.java
----------------------------------------------------------------------
diff --git 
a/copycat/api/src/test/java/org/apache/kafka/copycat/connector/ConnectorReconfigurationTest.java
 
b/copycat/api/src/test/java/org/apache/kafka/copycat/connector/ConnectorReconfigurationTest.java
deleted file mode 100644
index 7b1e9eb..0000000
--- 
a/copycat/api/src/test/java/org/apache/kafka/copycat/connector/ConnectorReconfigurationTest.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.connector;
-
-import org.apache.kafka.copycat.errors.CopycatException;
-import org.junit.Test;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-import static org.junit.Assert.assertEquals;
-
-public class ConnectorReconfigurationTest {
-
-    @Test
-    public void testDefaultReconfigure() throws Exception {
-        TestConnector conn = new TestConnector(false);
-        conn.reconfigure(Collections.<String, String>emptyMap());
-        assertEquals(conn.stopOrder, 0);
-        assertEquals(conn.configureOrder, 1);
-    }
-
-    @Test(expected = CopycatException.class)
-    public void testReconfigureStopException() throws Exception {
-        TestConnector conn = new TestConnector(true);
-        conn.reconfigure(Collections.<String, String>emptyMap());
-    }
-
-    private static class TestConnector extends Connector {
-        private boolean stopException;
-        private int order = 0;
-        public int stopOrder = -1;
-        public int configureOrder = -1;
-
-        public TestConnector(boolean stopException) {
-            this.stopException = stopException;
-        }
-
-        @Override
-        public String version() {
-            return "1.0";
-        }
-
-        @Override
-        public void start(Map<String, String> props) {
-            configureOrder = order++;
-        }
-
-        @Override
-        public Class<? extends Task> taskClass() {
-            return null;
-        }
-
-        @Override
-        public List<Map<String, String>> taskConfigs(int count) {
-            return null;
-        }
-
-        @Override
-        public void stop() {
-            stopOrder = order++;
-            if (stopException)
-                throw new CopycatException("error");
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/api/src/test/java/org/apache/kafka/copycat/data/CopycatSchemaTest.java
----------------------------------------------------------------------
diff --git 
a/copycat/api/src/test/java/org/apache/kafka/copycat/data/CopycatSchemaTest.java
 
b/copycat/api/src/test/java/org/apache/kafka/copycat/data/CopycatSchemaTest.java
deleted file mode 100644
index 4976950..0000000
--- 
a/copycat/api/src/test/java/org/apache/kafka/copycat/data/CopycatSchemaTest.java
+++ /dev/null
@@ -1,303 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.data;
-
-import org.apache.kafka.copycat.errors.DataException;
-import org.junit.Test;
-
-import java.math.BigDecimal;
-import java.math.BigInteger;
-import java.nio.ByteBuffer;
-import java.nio.CharBuffer;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertNull;
-
-public class CopycatSchemaTest {
-    private static final Schema MAP_INT_STRING_SCHEMA = 
SchemaBuilder.map(Schema.INT32_SCHEMA, Schema.STRING_SCHEMA).build();
-    private static final Schema FLAT_STRUCT_SCHEMA = SchemaBuilder.struct()
-            .field("field", Schema.INT32_SCHEMA)
-            .build();
-    private static final Schema STRUCT_SCHEMA = SchemaBuilder.struct()
-            .field("first", Schema.INT32_SCHEMA)
-            .field("second", Schema.STRING_SCHEMA)
-            .field("array", SchemaBuilder.array(Schema.INT32_SCHEMA).build())
-            .field("map", SchemaBuilder.map(Schema.INT32_SCHEMA, 
Schema.STRING_SCHEMA).build())
-            .field("nested", FLAT_STRUCT_SCHEMA)
-            .build();
-    private static final Schema PARENT_STRUCT_SCHEMA = SchemaBuilder.struct()
-            .field("nested", FLAT_STRUCT_SCHEMA)
-            .build();
-
-    @Test
-    public void testFieldsOnStructSchema() {
-        Schema schema = SchemaBuilder.struct()
-                .field("foo", Schema.BOOLEAN_SCHEMA)
-                .field("bar", Schema.INT32_SCHEMA)
-                .build();
-
-        assertEquals(2, schema.fields().size());
-        // Validate field lookup by name
-        Field foo = schema.field("foo");
-        assertEquals(0, foo.index());
-        Field bar = schema.field("bar");
-        assertEquals(1, bar.index());
-        // Any other field name should fail
-        assertNull(schema.field("other"));
-    }
-
-
-    @Test(expected = DataException.class)
-    public void testFieldsOnlyValidForStructs() {
-        Schema.INT8_SCHEMA.fields();
-    }
-
-    @Test
-    public void testValidateValueMatchingType() {
-        CopycatSchema.validateValue(Schema.INT8_SCHEMA, (byte) 1);
-        CopycatSchema.validateValue(Schema.INT16_SCHEMA, (short) 1);
-        CopycatSchema.validateValue(Schema.INT32_SCHEMA, 1);
-        CopycatSchema.validateValue(Schema.INT64_SCHEMA, (long) 1);
-        CopycatSchema.validateValue(Schema.FLOAT32_SCHEMA, 1.f);
-        CopycatSchema.validateValue(Schema.FLOAT64_SCHEMA, 1.);
-        CopycatSchema.validateValue(Schema.BOOLEAN_SCHEMA, true);
-        CopycatSchema.validateValue(Schema.STRING_SCHEMA, "a string");
-        CopycatSchema.validateValue(Schema.BYTES_SCHEMA, "a byte 
array".getBytes());
-        CopycatSchema.validateValue(Schema.BYTES_SCHEMA, ByteBuffer.wrap("a 
byte array".getBytes()));
-        
CopycatSchema.validateValue(SchemaBuilder.array(Schema.INT32_SCHEMA).build(), 
Arrays.asList(1, 2, 3));
-        CopycatSchema.validateValue(
-                SchemaBuilder.map(Schema.INT32_SCHEMA, 
Schema.STRING_SCHEMA).build(),
-                Collections.singletonMap(1, "value")
-        );
-        // Struct tests the basic struct layout + complex field types + nested 
structs
-        Struct structValue = new Struct(STRUCT_SCHEMA)
-                .put("first", 1)
-                .put("second", "foo")
-                .put("array", Arrays.asList(1, 2, 3))
-                .put("map", Collections.singletonMap(1, "value"))
-                .put("nested", new Struct(FLAT_STRUCT_SCHEMA).put("field", 
12));
-        CopycatSchema.validateValue(STRUCT_SCHEMA, structValue);
-    }
-
-    @Test
-    public void testValidateValueMatchingLogicalType() {
-        CopycatSchema.validateValue(Decimal.schema(2), new BigDecimal(new 
BigInteger("156"), 2));
-        CopycatSchema.validateValue(Date.SCHEMA, new java.util.Date(0));
-        CopycatSchema.validateValue(Time.SCHEMA, new java.util.Date(0));
-        CopycatSchema.validateValue(Timestamp.SCHEMA, new java.util.Date(0));
-    }
-
-    // To avoid requiring excessive numbers of tests, these checks for invalid 
types use a similar type where possible
-    // to only include a single test for each type
-
-    @Test(expected = DataException.class)
-    public void testValidateValueMismatchInt8() {
-        CopycatSchema.validateValue(Schema.INT8_SCHEMA, 1);
-    }
-
-    @Test(expected = DataException.class)
-    public void testValidateValueMismatchInt16() {
-        CopycatSchema.validateValue(Schema.INT16_SCHEMA, 1);
-    }
-
-    @Test(expected = DataException.class)
-    public void testValidateValueMismatchInt32() {
-        CopycatSchema.validateValue(Schema.INT32_SCHEMA, (long) 1);
-    }
-
-    @Test(expected = DataException.class)
-    public void testValidateValueMismatchInt64() {
-        CopycatSchema.validateValue(Schema.INT64_SCHEMA, 1);
-    }
-
-    @Test(expected = DataException.class)
-    public void testValidateValueMismatchFloat() {
-        CopycatSchema.validateValue(Schema.FLOAT32_SCHEMA, 1.0);
-    }
-
-    @Test(expected = DataException.class)
-    public void testValidateValueMismatchDouble() {
-        CopycatSchema.validateValue(Schema.FLOAT64_SCHEMA, 1.f);
-    }
-
-    @Test(expected = DataException.class)
-    public void testValidateValueMismatchBoolean() {
-        CopycatSchema.validateValue(Schema.BOOLEAN_SCHEMA, 1.f);
-    }
-
-    @Test(expected = DataException.class)
-    public void testValidateValueMismatchString() {
-        // CharSequence is a similar type (supertype of String), but we 
restrict to String.
-        CharBuffer cbuf = CharBuffer.wrap("abc");
-        CopycatSchema.validateValue(Schema.STRING_SCHEMA, cbuf);
-    }
-
-    @Test(expected = DataException.class)
-    public void testValidateValueMismatchBytes() {
-        CopycatSchema.validateValue(Schema.BYTES_SCHEMA, new Object[]{1, 
"foo"});
-    }
-
-    @Test(expected = DataException.class)
-    public void testValidateValueMismatchArray() {
-        
CopycatSchema.validateValue(SchemaBuilder.array(Schema.INT32_SCHEMA).build(), 
Arrays.asList("a", "b", "c"));
-    }
-
-    @Test(expected = DataException.class)
-    public void testValidateValueMismatchArraySomeMatch() {
-        // Even if some match the right type, this should fail if any 
mismatch. In this case, type erasure loses
-        // the fact that the list is actually List<Object>, but we couldn't 
tell if only checking the first element
-        
CopycatSchema.validateValue(SchemaBuilder.array(Schema.INT32_SCHEMA).build(), 
Arrays.asList(1, 2, "c"));
-    }
-
-    @Test(expected = DataException.class)
-    public void testValidateValueMismatchMapKey() {
-        CopycatSchema.validateValue(MAP_INT_STRING_SCHEMA, 
Collections.singletonMap("wrong key type", "value"));
-    }
-
-    @Test(expected = DataException.class)
-    public void testValidateValueMismatchMapValue() {
-        CopycatSchema.validateValue(MAP_INT_STRING_SCHEMA, 
Collections.singletonMap(1, 2));
-    }
-
-    @Test(expected = DataException.class)
-    public void testValidateValueMismatchMapSomeKeys() {
-        Map<Object, String> data = new HashMap<>();
-        data.put(1, "abc");
-        data.put("wrong", "it's as easy as one two three");
-        CopycatSchema.validateValue(MAP_INT_STRING_SCHEMA, data);
-    }
-
-    @Test(expected = DataException.class)
-    public void testValidateValueMismatchMapSomeValues() {
-        Map<Integer, Object> data = new HashMap<>();
-        data.put(1, "abc");
-        data.put(2, "wrong".getBytes());
-        CopycatSchema.validateValue(MAP_INT_STRING_SCHEMA, data);
-    }
-
-    @Test(expected = DataException.class)
-    public void testValidateValueMismatchStructWrongSchema() {
-        // Completely mismatching schemas
-        CopycatSchema.validateValue(
-                FLAT_STRUCT_SCHEMA,
-                new Struct(SchemaBuilder.struct().field("x", 
Schema.INT32_SCHEMA).build()).put("x", 1)
-        );
-    }
-
-    @Test(expected = DataException.class)
-    public void testValidateValueMismatchStructWrongNestedSchema() {
-        // Top-level schema  matches, but nested does not.
-        CopycatSchema.validateValue(
-                PARENT_STRUCT_SCHEMA,
-                new Struct(PARENT_STRUCT_SCHEMA)
-                        .put("nested", new 
Struct(SchemaBuilder.struct().field("x", Schema.INT32_SCHEMA).build()).put("x", 
1))
-        );
-    }
-
-    @Test(expected = DataException.class)
-    public void testValidateValueMismatchDecimal() {
-        CopycatSchema.validateValue(Decimal.schema(2), new BigInteger("156"));
-    }
-
-    @Test(expected = DataException.class)
-    public void testValidateValueMismatchDate() {
-        CopycatSchema.validateValue(Date.SCHEMA, 1000L);
-    }
-
-    @Test(expected = DataException.class)
-    public void testValidateValueMismatchTime() {
-        CopycatSchema.validateValue(Time.SCHEMA, 1000L);
-    }
-
-    @Test(expected = DataException.class)
-    public void testValidateValueMismatchTimestamp() {
-        CopycatSchema.validateValue(Timestamp.SCHEMA, 1000L);
-    }
-
-    @Test
-    public void testPrimitiveEquality() {
-        // Test that primitive types, which only need to consider all the type 
& metadata fields, handle equality correctly
-        CopycatSchema s1 = new CopycatSchema(Schema.Type.INT8, false, null, 
"name", 2, "doc");
-        CopycatSchema s2 = new CopycatSchema(Schema.Type.INT8, false, null, 
"name", 2, "doc");
-        CopycatSchema differentType = new CopycatSchema(Schema.Type.INT16, 
false, null, "name", 2, "doc");
-        CopycatSchema differentOptional = new CopycatSchema(Schema.Type.INT8, 
true, null, "name", 2, "doc");
-        CopycatSchema differentDefault = new CopycatSchema(Schema.Type.INT8, 
false, true, "name", 2, "doc");
-        CopycatSchema differentName = new CopycatSchema(Schema.Type.INT8, 
false, null, "otherName", 2, "doc");
-        CopycatSchema differentVersion = new CopycatSchema(Schema.Type.INT8, 
false, null, "name", 4, "doc");
-        CopycatSchema differentDoc = new CopycatSchema(Schema.Type.INT8, 
false, null, "name", 2, "other doc");
-        CopycatSchema differentParameters = new 
CopycatSchema(Schema.Type.INT8, false, null, "name", 2, "doc", 
Collections.singletonMap("param", "value"), null, null, null);
-
-        assertEquals(s1, s2);
-        assertNotEquals(s1, differentType);
-        assertNotEquals(s1, differentOptional);
-        assertNotEquals(s1, differentDefault);
-        assertNotEquals(s1, differentName);
-        assertNotEquals(s1, differentVersion);
-        assertNotEquals(s1, differentDoc);
-        assertNotEquals(s1, differentParameters);
-    }
-
-    @Test
-    public void testArrayEquality() {
-        // Validate that the value type for the array is tested for equality. 
This test makes sure the same schema object is
-        // never reused to ensure we're actually checking equality
-        CopycatSchema s1 = new CopycatSchema(Schema.Type.ARRAY, false, null, 
null, null, null, null, null, null, SchemaBuilder.int8().build());
-        CopycatSchema s2 = new CopycatSchema(Schema.Type.ARRAY, false, null, 
null, null, null, null, null, null, SchemaBuilder.int8().build());
-        CopycatSchema differentValueSchema = new 
CopycatSchema(Schema.Type.ARRAY, false, null, null, null, null, null, null, 
null, SchemaBuilder.int16().build());
-
-        assertEquals(s1, s2);
-        assertNotEquals(s1, differentValueSchema);
-    }
-
-    @Test
-    public void testMapEquality() {
-        // Same as testArrayEquality, but for both key and value schemas
-        CopycatSchema s1 = new CopycatSchema(Schema.Type.MAP, false, null, 
null, null, null, null, null, SchemaBuilder.int8().build(), 
SchemaBuilder.int16().build());
-        CopycatSchema s2 = new CopycatSchema(Schema.Type.MAP, false, null, 
null, null, null, null, null, SchemaBuilder.int8().build(), 
SchemaBuilder.int16().build());
-        CopycatSchema differentKeySchema = new CopycatSchema(Schema.Type.MAP, 
false, null, null, null, null, null, null, SchemaBuilder.string().build(), 
SchemaBuilder.int16().build());
-        CopycatSchema differentValueSchema = new 
CopycatSchema(Schema.Type.MAP, false, null, null, null, null, null, null, 
SchemaBuilder.int8().build(), SchemaBuilder.string().build());
-
-        assertEquals(s1, s2);
-        assertNotEquals(s1, differentKeySchema);
-        assertNotEquals(s1, differentValueSchema);
-    }
-
-    @Test
-    public void testStructEquality() {
-        // Same as testArrayEquality, but checks differences in fields. Only 
does a simple check, relying on tests of
-        // Field's equals() method to validate all variations in the list of 
fields will be checked
-        CopycatSchema s1 = new CopycatSchema(Schema.Type.STRUCT, false, null, 
null, null, null, null,
-                Arrays.asList(new Field("field", 0, 
SchemaBuilder.int8().build()),
-                        new Field("field2", 1, 
SchemaBuilder.int16().build())), null, null);
-        CopycatSchema s2 = new CopycatSchema(Schema.Type.STRUCT, false, null, 
null, null, null, null,
-                Arrays.asList(new Field("field", 0, 
SchemaBuilder.int8().build()),
-                        new Field("field2", 1, 
SchemaBuilder.int16().build())), null, null);
-        CopycatSchema differentField = new CopycatSchema(Schema.Type.STRUCT, 
false, null, null, null, null, null,
-                Arrays.asList(new Field("field", 0, 
SchemaBuilder.int8().build()),
-                        new Field("different field name", 1, 
SchemaBuilder.int16().build())), null, null);
-
-        assertEquals(s1, s2);
-        assertNotEquals(s1, differentField);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/api/src/test/java/org/apache/kafka/copycat/data/DateTest.java
----------------------------------------------------------------------
diff --git 
a/copycat/api/src/test/java/org/apache/kafka/copycat/data/DateTest.java 
b/copycat/api/src/test/java/org/apache/kafka/copycat/data/DateTest.java
deleted file mode 100644
index e7885ab..0000000
--- a/copycat/api/src/test/java/org/apache/kafka/copycat/data/DateTest.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.data;
-
-import org.apache.kafka.copycat.errors.DataException;
-import org.junit.Test;
-
-import java.util.Calendar;
-import java.util.GregorianCalendar;
-import java.util.TimeZone;
-
-import static org.junit.Assert.assertEquals;
-
-public class DateTest {
-    private static final GregorianCalendar EPOCH;
-    private static final GregorianCalendar EPOCH_PLUS_TEN_THOUSAND_DAYS;
-    private static final GregorianCalendar EPOCH_PLUS_TIME_COMPONENT;
-    static {
-        EPOCH = new GregorianCalendar(1970, Calendar.JANUARY, 1, 0, 0, 0);
-        EPOCH.setTimeZone(TimeZone.getTimeZone("UTC"));
-
-        EPOCH_PLUS_TIME_COMPONENT = new GregorianCalendar(1970, 
Calendar.JANUARY, 1, 0, 0, 1);
-        EPOCH_PLUS_TIME_COMPONENT.setTimeZone(TimeZone.getTimeZone("UTC"));
-
-        EPOCH_PLUS_TEN_THOUSAND_DAYS = new GregorianCalendar(1970, 
Calendar.JANUARY, 1, 0, 0, 0);
-        EPOCH_PLUS_TEN_THOUSAND_DAYS.setTimeZone(TimeZone.getTimeZone("UTC"));
-        EPOCH_PLUS_TEN_THOUSAND_DAYS.add(Calendar.DATE, 10000);
-    }
-
-    @Test
-    public void testBuilder() {
-        Schema plain = Date.SCHEMA;
-        assertEquals(Date.LOGICAL_NAME, plain.name());
-        assertEquals(1, (Object) plain.version());
-    }
-
-    @Test
-    public void testFromLogical() {
-        assertEquals(0, Date.fromLogical(Date.SCHEMA, EPOCH.getTime()));
-        assertEquals(10000, Date.fromLogical(Date.SCHEMA, 
EPOCH_PLUS_TEN_THOUSAND_DAYS.getTime()));
-    }
-
-    @Test(expected = DataException.class)
-    public void testFromLogicalInvalidSchema() {
-        Date.fromLogical(Date.builder().name("invalid").build(), 
EPOCH.getTime());
-    }
-
-    @Test(expected = DataException.class)
-    public void testFromLogicalInvalidHasTimeComponents() {
-        Date.fromLogical(Date.SCHEMA, EPOCH_PLUS_TIME_COMPONENT.getTime());
-    }
-
-    @Test
-    public void testToLogical() {
-        assertEquals(EPOCH.getTime(), Date.toLogical(Date.SCHEMA, 0));
-        assertEquals(EPOCH_PLUS_TEN_THOUSAND_DAYS.getTime(), 
Date.toLogical(Date.SCHEMA, 10000));
-    }
-
-    @Test(expected = DataException.class)
-    public void testToLogicalInvalidSchema() {
-        Date.toLogical(Date.builder().name("invalid").build(), 0);
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/api/src/test/java/org/apache/kafka/copycat/data/DecimalTest.java
----------------------------------------------------------------------
diff --git 
a/copycat/api/src/test/java/org/apache/kafka/copycat/data/DecimalTest.java 
b/copycat/api/src/test/java/org/apache/kafka/copycat/data/DecimalTest.java
deleted file mode 100644
index ce71161..0000000
--- a/copycat/api/src/test/java/org/apache/kafka/copycat/data/DecimalTest.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.data;
-
-import org.junit.Test;
-
-import java.math.BigDecimal;
-import java.math.BigInteger;
-import java.util.Collections;
-
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-
-public class DecimalTest {
-    private static final int TEST_SCALE = 2;
-    private static final BigDecimal TEST_DECIMAL = new BigDecimal(new 
BigInteger("156"), TEST_SCALE);
-    private static final BigDecimal TEST_DECIMAL_NEGATIVE = new BigDecimal(new 
BigInteger("-156"), TEST_SCALE);
-    private static final byte[] TEST_BYTES = new byte[]{0, -100};
-    private static final byte[] TEST_BYTES_NEGATIVE = new byte[]{-1, 100};
-
-    @Test
-    public void testBuilder() {
-        Schema plain = Decimal.builder(2).build();
-        assertEquals(Decimal.LOGICAL_NAME, plain.name());
-        assertEquals(Collections.singletonMap(Decimal.SCALE_FIELD, "2"), 
plain.parameters());
-        assertEquals(1, (Object) plain.version());
-    }
-
-    @Test
-    public void testFromLogical() {
-        Schema schema = Decimal.schema(TEST_SCALE);
-        byte[] encoded = Decimal.fromLogical(schema, TEST_DECIMAL);
-        assertArrayEquals(TEST_BYTES, encoded);
-
-        encoded = Decimal.fromLogical(schema, TEST_DECIMAL_NEGATIVE);
-        assertArrayEquals(TEST_BYTES_NEGATIVE, encoded);
-    }
-
-    @Test
-    public void testToLogical() {
-        Schema schema = Decimal.schema(2);
-        BigDecimal converted = Decimal.toLogical(schema, TEST_BYTES);
-        assertEquals(TEST_DECIMAL, converted);
-
-        converted = Decimal.toLogical(schema, TEST_BYTES_NEGATIVE);
-        assertEquals(TEST_DECIMAL_NEGATIVE, converted);
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/api/src/test/java/org/apache/kafka/copycat/data/FieldTest.java
----------------------------------------------------------------------
diff --git 
a/copycat/api/src/test/java/org/apache/kafka/copycat/data/FieldTest.java 
b/copycat/api/src/test/java/org/apache/kafka/copycat/data/FieldTest.java
deleted file mode 100644
index d5458bc..0000000
--- a/copycat/api/src/test/java/org/apache/kafka/copycat/data/FieldTest.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.data;
-
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-
-public class FieldTest {
-
-    @Test
-    public void testEquality() {
-        Field field1 = new Field("name", 0, Schema.INT8_SCHEMA);
-        Field field2 = new Field("name", 0, Schema.INT8_SCHEMA);
-        Field differentName = new Field("name2", 0, Schema.INT8_SCHEMA);
-        Field differentIndex = new Field("name", 1, Schema.INT8_SCHEMA);
-        Field differentSchema = new Field("name", 0, Schema.INT16_SCHEMA);
-
-        assertEquals(field1, field2);
-        assertNotEquals(field1, differentName);
-        assertNotEquals(field1, differentIndex);
-        assertNotEquals(field1, differentSchema);
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/api/src/test/java/org/apache/kafka/copycat/data/SchemaBuilderTest.java
----------------------------------------------------------------------
diff --git 
a/copycat/api/src/test/java/org/apache/kafka/copycat/data/SchemaBuilderTest.java
 
b/copycat/api/src/test/java/org/apache/kafka/copycat/data/SchemaBuilderTest.java
deleted file mode 100644
index 183f5fc..0000000
--- 
a/copycat/api/src/test/java/org/apache/kafka/copycat/data/SchemaBuilderTest.java
+++ /dev/null
@@ -1,305 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.data;
-
-import org.apache.kafka.copycat.errors.SchemaBuilderException;
-import org.junit.Test;
-
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-
-public class SchemaBuilderTest {
-    private static final String NAME = "name";
-    private static final Integer VERSION = 2;
-    private static final String DOC = "doc";
-    private static final Map<String, String> NO_PARAMS = null;
-
-    @Test
-    public void testInt8Builder() {
-        Schema schema = SchemaBuilder.int8().build();
-        assertTypeAndDefault(schema, Schema.Type.INT8, false, null);
-        assertNoMetadata(schema);
-
-        schema = 
SchemaBuilder.int8().name(NAME).optional().defaultValue((byte) 12)
-                .version(VERSION).doc(DOC).build();
-        assertTypeAndDefault(schema, Schema.Type.INT8, true, (byte) 12);
-        assertMetadata(schema, NAME, VERSION, DOC, NO_PARAMS);
-    }
-
-    @Test(expected = SchemaBuilderException.class)
-    public void testInt8BuilderInvalidDefault() {
-        SchemaBuilder.int8().defaultValue("invalid");
-    }
-
-    @Test
-    public void testInt16Builder() {
-        Schema schema = SchemaBuilder.int16().build();
-        assertTypeAndDefault(schema, Schema.Type.INT16, false, null);
-        assertNoMetadata(schema);
-
-        schema = 
SchemaBuilder.int16().name(NAME).optional().defaultValue((short) 12)
-                .version(VERSION).doc(DOC).build();
-        assertTypeAndDefault(schema, Schema.Type.INT16, true, (short) 12);
-        assertMetadata(schema, NAME, VERSION, DOC, NO_PARAMS);
-    }
-
-    @Test(expected = SchemaBuilderException.class)
-    public void testInt16BuilderInvalidDefault() {
-        SchemaBuilder.int16().defaultValue("invalid");
-    }
-
-    @Test
-    public void testInt32Builder() {
-        Schema schema = SchemaBuilder.int32().build();
-        assertTypeAndDefault(schema, Schema.Type.INT32, false, null);
-        assertNoMetadata(schema);
-
-        schema = SchemaBuilder.int32().name(NAME).optional().defaultValue(12)
-                .version(VERSION).doc(DOC).build();
-        assertTypeAndDefault(schema, Schema.Type.INT32, true, 12);
-        assertMetadata(schema, NAME, VERSION, DOC, NO_PARAMS);
-    }
-
-    @Test(expected = SchemaBuilderException.class)
-    public void testInt32BuilderInvalidDefault() {
-        SchemaBuilder.int32().defaultValue("invalid");
-    }
-
-    @Test
-    public void testInt64Builder() {
-        Schema schema = SchemaBuilder.int64().build();
-        assertTypeAndDefault(schema, Schema.Type.INT64, false, null);
-        assertNoMetadata(schema);
-
-        schema = 
SchemaBuilder.int64().name(NAME).optional().defaultValue((long) 12)
-                .version(VERSION).doc(DOC).build();
-        assertTypeAndDefault(schema, Schema.Type.INT64, true, (long) 12);
-        assertMetadata(schema, NAME, VERSION, DOC, NO_PARAMS);
-    }
-
-    @Test(expected = SchemaBuilderException.class)
-    public void testInt64BuilderInvalidDefault() {
-        SchemaBuilder.int64().defaultValue("invalid");
-    }
-
-    @Test
-    public void testFloatBuilder() {
-        Schema schema = SchemaBuilder.float32().build();
-        assertTypeAndDefault(schema, Schema.Type.FLOAT32, false, null);
-        assertNoMetadata(schema);
-
-        schema = 
SchemaBuilder.float32().name(NAME).optional().defaultValue(12.f)
-                .version(VERSION).doc(DOC).build();
-        assertTypeAndDefault(schema, Schema.Type.FLOAT32, true, 12.f);
-        assertMetadata(schema, NAME, VERSION, DOC, NO_PARAMS);
-    }
-
-    @Test(expected = SchemaBuilderException.class)
-    public void testFloatBuilderInvalidDefault() {
-        SchemaBuilder.float32().defaultValue("invalid");
-    }
-
-    @Test
-    public void testDoubleBuilder() {
-        Schema schema = SchemaBuilder.float64().build();
-        assertTypeAndDefault(schema, Schema.Type.FLOAT64, false, null);
-        assertNoMetadata(schema);
-
-        schema = 
SchemaBuilder.float64().name(NAME).optional().defaultValue(12.0)
-                .version(VERSION).doc(DOC).build();
-        assertTypeAndDefault(schema, Schema.Type.FLOAT64, true, 12.0);
-        assertMetadata(schema, NAME, VERSION, DOC, NO_PARAMS);
-    }
-
-    @Test(expected = SchemaBuilderException.class)
-    public void testDoubleBuilderInvalidDefault() {
-        SchemaBuilder.float64().defaultValue("invalid");
-    }
-
-    @Test
-    public void testBooleanBuilder() {
-        Schema schema = SchemaBuilder.bool().build();
-        assertTypeAndDefault(schema, Schema.Type.BOOLEAN, false, null);
-        assertNoMetadata(schema);
-
-        schema = SchemaBuilder.bool().name(NAME).optional().defaultValue(true)
-                .version(VERSION).doc(DOC).build();
-        assertTypeAndDefault(schema, Schema.Type.BOOLEAN, true, true);
-        assertMetadata(schema, NAME, VERSION, DOC, NO_PARAMS);
-    }
-
-    @Test(expected = SchemaBuilderException.class)
-    public void testBooleanBuilderInvalidDefault() {
-        SchemaBuilder.bool().defaultValue("invalid");
-    }
-
-    @Test
-    public void testStringBuilder() {
-        Schema schema = SchemaBuilder.string().build();
-        assertTypeAndDefault(schema, Schema.Type.STRING, false, null);
-        assertNoMetadata(schema);
-
-        schema = SchemaBuilder.string().name(NAME).optional().defaultValue("a 
default string")
-                .version(VERSION).doc(DOC).build();
-        assertTypeAndDefault(schema, Schema.Type.STRING, true, "a default 
string");
-        assertMetadata(schema, NAME, VERSION, DOC, NO_PARAMS);
-    }
-
-    @Test(expected = SchemaBuilderException.class)
-    public void testStringBuilderInvalidDefault() {
-        SchemaBuilder.string().defaultValue(true);
-    }
-
-    @Test
-    public void testBytesBuilder() {
-        Schema schema = SchemaBuilder.bytes().build();
-        assertTypeAndDefault(schema, Schema.Type.BYTES, false, null);
-        assertNoMetadata(schema);
-
-        schema = SchemaBuilder.bytes().name(NAME).optional().defaultValue("a 
default byte array".getBytes())
-                .version(VERSION).doc(DOC).build();
-        assertTypeAndDefault(schema, Schema.Type.BYTES, true, "a default byte 
array".getBytes());
-        assertMetadata(schema, NAME, VERSION, DOC, NO_PARAMS);
-    }
-
-    @Test(expected = SchemaBuilderException.class)
-    public void testBytesBuilderInvalidDefault() {
-        SchemaBuilder.bytes().defaultValue("a string, not bytes");
-    }
-
-
-    @Test
-    public void testParameters() {
-        Map<String, String> expectedParameters = new HashMap<>();
-        expectedParameters.put("foo", "val");
-        expectedParameters.put("bar", "baz");
-
-        Schema schema = SchemaBuilder.string().parameter("foo", 
"val").parameter("bar", "baz").build();
-        assertTypeAndDefault(schema, Schema.Type.STRING, false, null);
-        assertMetadata(schema, null, null, null, expectedParameters);
-
-        schema = SchemaBuilder.string().parameters(expectedParameters).build();
-        assertTypeAndDefault(schema, Schema.Type.STRING, false, null);
-        assertMetadata(schema, null, null, null, expectedParameters);
-    }
-
-
-    @Test
-    public void testStructBuilder() {
-        Schema schema = SchemaBuilder.struct()
-                .field("field1", Schema.INT8_SCHEMA)
-                .field("field2", Schema.INT8_SCHEMA)
-                .build();
-        assertTypeAndDefault(schema, Schema.Type.STRUCT, false, null);
-        assertEquals(2, schema.fields().size());
-        assertEquals("field1", schema.fields().get(0).name());
-        assertEquals(0, schema.fields().get(0).index());
-        assertEquals(Schema.INT8_SCHEMA, schema.fields().get(0).schema());
-        assertEquals("field2", schema.fields().get(1).name());
-        assertEquals(1, schema.fields().get(1).index());
-        assertEquals(Schema.INT8_SCHEMA, schema.fields().get(1).schema());
-        assertNoMetadata(schema);
-    }
-
-    @Test(expected = SchemaBuilderException.class)
-    public void testNonStructCantHaveFields() {
-        SchemaBuilder.int8().field("field", SchemaBuilder.int8().build());
-    }
-
-
-    @Test
-    public void testArrayBuilder() {
-        Schema schema = SchemaBuilder.array(Schema.INT8_SCHEMA).build();
-        assertTypeAndDefault(schema, Schema.Type.ARRAY, false, null);
-        assertEquals(schema.valueSchema(), Schema.INT8_SCHEMA);
-        assertNoMetadata(schema);
-
-        // Default value
-        List<Byte> defArray = Arrays.asList((byte) 1, (byte) 2);
-        schema = 
SchemaBuilder.array(Schema.INT8_SCHEMA).defaultValue(defArray).build();
-        assertTypeAndDefault(schema, Schema.Type.ARRAY, false, defArray);
-        assertEquals(schema.valueSchema(), Schema.INT8_SCHEMA);
-        assertNoMetadata(schema);
-    }
-
-    @Test(expected = SchemaBuilderException.class)
-    public void testArrayBuilderInvalidDefault() {
-        // Array, but wrong embedded type
-        
SchemaBuilder.array(Schema.INT8_SCHEMA).defaultValue(Arrays.asList("string")).build();
-    }
-
-    @Test
-    public void testMapBuilder() {
-        Schema schema = SchemaBuilder.map(Schema.INT8_SCHEMA, 
Schema.INT8_SCHEMA).build();
-        assertTypeAndDefault(schema, Schema.Type.MAP, false, null);
-        assertEquals(schema.keySchema(), Schema.INT8_SCHEMA);
-        assertEquals(schema.valueSchema(), Schema.INT8_SCHEMA);
-        assertNoMetadata(schema);
-
-        // Default value
-        Map<Byte, Byte> defMap = Collections.singletonMap((byte) 5, (byte) 10);
-        schema = SchemaBuilder.map(Schema.INT8_SCHEMA, Schema.INT8_SCHEMA)
-                .defaultValue(defMap).build();
-        assertTypeAndDefault(schema, Schema.Type.MAP, false, defMap);
-        assertEquals(schema.keySchema(), Schema.INT8_SCHEMA);
-        assertEquals(schema.valueSchema(), Schema.INT8_SCHEMA);
-        assertNoMetadata(schema);
-    }
-
-    @Test(expected = SchemaBuilderException.class)
-    public void testMapBuilderInvalidDefault() {
-        // Map, but wrong embedded type
-        Map<Byte, String> defMap = Collections.singletonMap((byte) 5, "foo");
-        SchemaBuilder.map(Schema.INT8_SCHEMA, Schema.INT8_SCHEMA)
-                .defaultValue(defMap).build();
-    }
-
-
-
-    private void assertTypeAndDefault(Schema schema, Schema.Type type, boolean 
optional, Object defaultValue) {
-        assertEquals(type, schema.type());
-        assertEquals(optional, schema.isOptional());
-        if (type == Schema.Type.BYTES) {
-            // byte[] is not comparable, need to wrap to check correctly
-            if (defaultValue == null)
-                assertNull(schema.defaultValue());
-            else
-                assertEquals(ByteBuffer.wrap((byte[]) defaultValue), 
ByteBuffer.wrap((byte[]) schema.defaultValue()));
-        } else {
-            assertEquals(defaultValue, schema.defaultValue());
-        }
-    }
-
-    private void assertMetadata(Schema schema, String name, Integer version, 
String doc, Map<String, String> parameters) {
-        assertEquals(name, schema.name());
-        assertEquals(version, schema.version());
-        assertEquals(doc, schema.doc());
-        assertEquals(parameters, schema.parameters());
-    }
-
-    private void assertNoMetadata(Schema schema) {
-        assertMetadata(schema, null, null, null, null);
-    }
-}

Reply via email to