http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/KafkaOffsetBackingStore.java
----------------------------------------------------------------------
diff --git 
a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/KafkaOffsetBackingStore.java
 
b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/KafkaOffsetBackingStore.java
deleted file mode 100644
index b270368..0000000
--- 
a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/KafkaOffsetBackingStore.java
+++ /dev/null
@@ -1,213 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.storage;
-
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.kafka.common.utils.SystemTime;
-import org.apache.kafka.copycat.errors.CopycatException;
-import org.apache.kafka.copycat.util.Callback;
-import org.apache.kafka.copycat.util.ConvertingFutureCallback;
-import org.apache.kafka.copycat.util.KafkaBasedLog;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.nio.ByteBuffer;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-/**
- * <p>
- *     Implementation of OffsetBackingStore that uses a Kafka topic to store 
offset data.
- * </p>
- * <p>
- *     Internally, this implementation both produces to and consumes from a 
Kafka topic which stores the offsets.
- *     It accepts producer and consumer overrides via its configuration but 
forces some settings to specific values
- *     to ensure correct behavior (e.g. acks, auto.offset.reset).
- * </p>
- */
-public class KafkaOffsetBackingStore implements OffsetBackingStore {
-    private static final Logger log = 
LoggerFactory.getLogger(KafkaOffsetBackingStore.class);
-
-    public final static String OFFSET_STORAGE_TOPIC_CONFIG = 
"offset.storage.topic";
-
-    private KafkaBasedLog<byte[], byte[]> offsetLog;
-    private HashMap<ByteBuffer, ByteBuffer> data;
-
-    @Override
-    public void configure(Map<String, ?> configs) {
-        String topic = (String) configs.get(OFFSET_STORAGE_TOPIC_CONFIG);
-        if (topic == null)
-            throw new CopycatException("Offset storage topic must be 
specified");
-
-        data = new HashMap<>();
-
-        Map<String, Object> producerProps = new HashMap<>();
-        producerProps.putAll(configs);
-        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArraySerializer");
-        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArraySerializer");
-        producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
-
-        Map<String, Object> consumerProps = new HashMap<>();
-        consumerProps.putAll(configs);
-        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
-        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
-        consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
-
-        offsetLog = createKafkaBasedLog(topic, producerProps, consumerProps, 
consumedCallback);
-    }
-
-    @Override
-    public void start() {
-        log.info("Starting KafkaOffsetBackingStore");
-        offsetLog.start();
-        log.info("Finished reading offsets topic and starting 
KafkaOffsetBackingStore");
-    }
-
-    @Override
-    public void stop() {
-        log.info("Stopping KafkaOffsetBackingStore");
-        offsetLog.stop();
-        log.info("Stopped KafkaOffsetBackingStore");
-    }
-
-    @Override
-    public Future<Map<ByteBuffer, ByteBuffer>> get(final 
Collection<ByteBuffer> keys,
-                                                   final 
Callback<Map<ByteBuffer, ByteBuffer>> callback) {
-        ConvertingFutureCallback<Void, Map<ByteBuffer, ByteBuffer>> future = 
new ConvertingFutureCallback<Void, Map<ByteBuffer, ByteBuffer>>(callback) {
-            @Override
-            public Map<ByteBuffer, ByteBuffer> convert(Void result) {
-                Map<ByteBuffer, ByteBuffer> values = new HashMap<>();
-                for (ByteBuffer key : keys)
-                    values.put(key, data.get(key));
-                return values;
-            }
-        };
-        // This operation may be relatively (but not too) expensive since it 
always requires checking end offsets, even
-        // if we've already read up to the end. However, it also should not be 
common (offsets should only be read when
-        // resetting a task). Always requiring that we read to the end is 
simpler than trying to differentiate when it
-        // is safe not to (which should only be if we *know* we've maintained 
ownership since the last write).
-        offsetLog.readToEnd(future);
-        return future;
-    }
-
-    @Override
-    public Future<Void> set(final Map<ByteBuffer, ByteBuffer> values, final 
Callback<Void> callback) {
-        SetCallbackFuture producerCallback = new 
SetCallbackFuture(values.size(), callback);
-
-        for (Map.Entry<ByteBuffer, ByteBuffer> entry : values.entrySet())
-            offsetLog.send(entry.getKey().array(), entry.getValue().array(), 
producerCallback);
-
-        return producerCallback;
-    }
-
-    private final Callback<ConsumerRecord<byte[], byte[]>> consumedCallback = 
new Callback<ConsumerRecord<byte[], byte[]>>() {
-        @Override
-        public void onCompletion(Throwable error, ConsumerRecord<byte[], 
byte[]> record) {
-            ByteBuffer key = record.key() != null ? 
ByteBuffer.wrap(record.key()) : null;
-            ByteBuffer value = record.value() != null ? 
ByteBuffer.wrap(record.value()) : null;
-            data.put(key, value);
-        }
-    };
-
-    private KafkaBasedLog<byte[], byte[]> createKafkaBasedLog(String topic, 
Map<String, Object> producerProps,
-                                                              Map<String, 
Object> consumerProps, Callback<ConsumerRecord<byte[], byte[]>> 
consumedCallback) {
-        return new KafkaBasedLog<>(topic, producerProps, consumerProps, 
consumedCallback, new SystemTime());
-    }
-
-    private static class SetCallbackFuture implements 
org.apache.kafka.clients.producer.Callback, Future<Void> {
-        private int numLeft;
-        private boolean completed = false;
-        private Throwable exception = null;
-        private final Callback<Void> callback;
-
-        public SetCallbackFuture(int numRecords, Callback<Void> callback) {
-            numLeft = numRecords;
-            this.callback = callback;
-        }
-
-        @Override
-        public synchronized void onCompletion(RecordMetadata metadata, 
Exception exception) {
-            if (exception != null) {
-                if (!completed) {
-                    this.exception = exception;
-                    callback.onCompletion(exception, null);
-                    completed = true;
-                    this.notify();
-                }
-                return;
-            }
-
-            numLeft -= 1;
-            if (numLeft == 0) {
-                callback.onCompletion(null, null);
-                completed = true;
-                this.notify();
-            }
-        }
-
-        @Override
-        public synchronized boolean cancel(boolean mayInterruptIfRunning) {
-            return false;
-        }
-
-        @Override
-        public synchronized boolean isCancelled() {
-            return false;
-        }
-
-        @Override
-        public synchronized boolean isDone() {
-            return completed;
-        }
-
-        @Override
-        public synchronized Void get() throws InterruptedException, 
ExecutionException {
-            while (!completed) {
-                this.wait();
-            }
-            if (exception != null)
-                throw new ExecutionException(exception);
-            return null;
-        }
-
-        @Override
-        public synchronized Void get(long timeout, TimeUnit unit) throws 
InterruptedException, ExecutionException, TimeoutException {
-            long started = System.currentTimeMillis();
-            long limit = started + unit.toMillis(timeout);
-            while (!completed) {
-                long leftMs = limit - System.currentTimeMillis();
-                if (leftMs < 0)
-                    throw new TimeoutException("KafkaOffsetBackingStore Future 
timed out.");
-                this.wait(leftMs);
-            }
-            if (exception != null)
-                throw new ExecutionException(exception);
-            return null;
-        }
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/MemoryOffsetBackingStore.java
----------------------------------------------------------------------
diff --git 
a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/MemoryOffsetBackingStore.java
 
b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/MemoryOffsetBackingStore.java
deleted file mode 100644
index 11a1b89..0000000
--- 
a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/MemoryOffsetBackingStore.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.storage;
-
-import org.apache.kafka.copycat.util.Callback;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.nio.ByteBuffer;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
-/**
- * Implementation of OffsetBackingStore that doesn't actually persist any 
data. To ensure this
- * behaves similarly to a real backing store, operations are executed 
asynchronously on a
- * background thread.
- */
-public class MemoryOffsetBackingStore implements OffsetBackingStore {
-    private static final Logger log = 
LoggerFactory.getLogger(MemoryOffsetBackingStore.class);
-
-    protected Map<ByteBuffer, ByteBuffer> data = new HashMap<>();
-    protected ExecutorService executor = Executors.newSingleThreadExecutor();
-
-    public MemoryOffsetBackingStore() {
-
-    }
-
-    @Override
-    public void configure(Map<String, ?> props) {
-    }
-
-    @Override
-    public synchronized void start() {
-    }
-
-    @Override
-    public synchronized void stop() {
-        // Nothing to do since this doesn't maintain any outstanding 
connections/data
-    }
-
-    @Override
-    public Future<Map<ByteBuffer, ByteBuffer>> get(
-            final Collection<ByteBuffer> keys,
-            final Callback<Map<ByteBuffer, ByteBuffer>> callback) {
-        return executor.submit(new Callable<Map<ByteBuffer, ByteBuffer>>() {
-            @Override
-            public Map<ByteBuffer, ByteBuffer> call() throws Exception {
-                Map<ByteBuffer, ByteBuffer> result = new HashMap<>();
-                synchronized (MemoryOffsetBackingStore.this) {
-                    for (ByteBuffer key : keys) {
-                        result.put(key, data.get(key));
-                    }
-                }
-                if (callback != null)
-                    callback.onCompletion(null, result);
-                return result;
-            }
-        });
-
-    }
-
-    @Override
-    public Future<Void> set(final Map<ByteBuffer, ByteBuffer> values,
-                            final Callback<Void> callback) {
-        return executor.submit(new Callable<Void>() {
-            @Override
-            public Void call() throws Exception {
-                synchronized (MemoryOffsetBackingStore.this) {
-                    for (Map.Entry<ByteBuffer, ByteBuffer> entry : 
values.entrySet()) {
-                        data.put(entry.getKey(), entry.getValue());
-                    }
-                    save();
-                }
-                if (callback != null)
-                    callback.onCompletion(null, null);
-                return null;
-            }
-        });
-    }
-
-    // Hook to allow subclasses to persist data
-    protected void save() {
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetBackingStore.java
----------------------------------------------------------------------
diff --git 
a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetBackingStore.java
 
b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetBackingStore.java
deleted file mode 100644
index 239d9a8..0000000
--- 
a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetBackingStore.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.storage;
-
-import org.apache.kafka.common.Configurable;
-import org.apache.kafka.copycat.util.Callback;
-
-import java.nio.ByteBuffer;
-import java.util.Collection;
-import java.util.Map;
-import java.util.concurrent.Future;
-
-/**
- * <p>
- * OffsetBackingStore is an interface for storage backends that store 
key-value data. The backing
- * store doesn't need to handle serialization or deserialization. It only 
needs to support
- * reading/writing bytes. Since it is expected these operations will require 
network
- * operations, only bulk operations are supported.
- * </p>
- * <p>
- * Since OffsetBackingStore is a shared resource that may be used by many 
OffsetStorage instances
- * that are associated with individual tasks, the caller must be sure keys 
include information about the
- * connector so that the shared namespace does not result in conflicting keys.
- * </p>
- */
-public interface OffsetBackingStore extends Configurable {
-
-    /**
-     * Start this offset store.
-     */
-    public void start();
-
-    /**
-     * Stop the backing store. Implementations should attempt to shutdown 
gracefully, but not block
-     * indefinitely.
-     */
-    public void stop();
-
-    /**
-     * Get the values for the specified keys
-     * @param keys list of keys to look up
-     * @param callback callback to invoke on completion
-     * @return future for the resulting map from key to value
-     */
-    public Future<Map<ByteBuffer, ByteBuffer>> get(
-            Collection<ByteBuffer> keys,
-            Callback<Map<ByteBuffer, ByteBuffer>> callback);
-
-    /**
-     * Set the specified keys and values.
-     * @param values map from key to value
-     * @param callback callback to invoke on completion
-     * @return void future for the operation
-     */
-    public Future<Void> set(Map<ByteBuffer, ByteBuffer> values,
-                            Callback<Void> callback);
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReaderImpl.java
----------------------------------------------------------------------
diff --git 
a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReaderImpl.java
 
b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReaderImpl.java
deleted file mode 100644
index 84229a5..0000000
--- 
a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReaderImpl.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.storage;
-
-import org.apache.kafka.copycat.data.SchemaAndValue;
-import org.apache.kafka.copycat.errors.CopycatException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Implementation of OffsetStorageReader. Unlike OffsetStorageWriter which is 
implemented
- * directly, the interface is only separate from this implementation because 
it needs to be
- * included in the public API package.
- */
-public class OffsetStorageReaderImpl implements OffsetStorageReader {
-    private static final Logger log = 
LoggerFactory.getLogger(OffsetStorageReaderImpl.class);
-
-    private final OffsetBackingStore backingStore;
-    private final String namespace;
-    private final Converter keyConverter;
-    private final Converter valueConverter;
-
-    public OffsetStorageReaderImpl(OffsetBackingStore backingStore, String 
namespace,
-                                   Converter keyConverter, Converter 
valueConverter) {
-        this.backingStore = backingStore;
-        this.namespace = namespace;
-        this.keyConverter = keyConverter;
-        this.valueConverter = valueConverter;
-    }
-
-    @Override
-    public <T> Map<String, Object> offset(Map<String, T> partition) {
-        return offsets(Arrays.asList(partition)).get(partition);
-    }
-
-    @Override
-    public <T> Map<Map<String, T>, Map<String, Object>> 
offsets(Collection<Map<String, T>> partitions) {
-        // Serialize keys so backing store can work with them
-        Map<ByteBuffer, Map<String, T>> serializedToOriginal = new 
HashMap<>(partitions.size());
-        for (Map<String, T> key : partitions) {
-            try {
-                // Offsets are treated as schemaless, their format is only 
validated here (and the returned value below)
-                OffsetUtils.validateFormat(key);
-                byte[] keySerialized = keyConverter.fromCopycatData(namespace, 
null, Arrays.asList(namespace, key));
-                ByteBuffer keyBuffer = (keySerialized != null) ? 
ByteBuffer.wrap(keySerialized) : null;
-                serializedToOriginal.put(keyBuffer, key);
-            } catch (Throwable t) {
-                log.error("CRITICAL: Failed to serialize partition key when 
getting offsets for task with "
-                        + "namespace {}. No value for this data will be 
returned, which may break the "
-                        + "task or cause it to skip some data.", namespace, t);
-            }
-        }
-
-        // Get serialized key -> serialized value from backing store
-        Map<ByteBuffer, ByteBuffer> raw;
-        try {
-            raw = backingStore.get(serializedToOriginal.keySet(), null).get();
-        } catch (Exception e) {
-            log.error("Failed to fetch offsets from namespace {}: ", 
namespace, e);
-            throw new CopycatException("Failed to fetch offsets.", e);
-        }
-
-        // Deserialize all the values and map back to the original keys
-        Map<Map<String, T>, Map<String, Object>> result = new 
HashMap<>(partitions.size());
-        for (Map.Entry<ByteBuffer, ByteBuffer> rawEntry : raw.entrySet()) {
-            try {
-                // Since null could be a valid key, explicitly check whether 
map contains the key
-                if (!serializedToOriginal.containsKey(rawEntry.getKey())) {
-                    log.error("Should be able to map {} back to a requested 
partition-offset key, backing "
-                            + "store may have returned invalid data", 
rawEntry.getKey());
-                    continue;
-                }
-                Map<String, T> origKey = 
serializedToOriginal.get(rawEntry.getKey());
-                SchemaAndValue deserializedSchemaAndValue = 
valueConverter.toCopycatData(namespace, rawEntry.getValue() != null ? 
rawEntry.getValue().array() : null);
-                Object deserializedValue = deserializedSchemaAndValue.value();
-                OffsetUtils.validateFormat(deserializedValue);
-
-                result.put(origKey, (Map<String, Object>) deserializedValue);
-            } catch (Throwable t) {
-                log.error("CRITICAL: Failed to deserialize offset data when 
getting offsets for task with"
-                        + " namespace {}. No value for this data will be 
returned, which may break the "
-                        + "task or cause it to skip some data. This could 
either be due to an error in "
-                        + "the connector implementation or incompatible 
schema.", namespace, t);
-            }
-        }
-
-        return result;
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageWriter.java
----------------------------------------------------------------------
diff --git 
a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageWriter.java
 
b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageWriter.java
deleted file mode 100644
index 59c12a7..0000000
--- 
a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageWriter.java
+++ /dev/null
@@ -1,207 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.storage;
-
-import org.apache.kafka.copycat.errors.CopycatException;
-import org.apache.kafka.copycat.util.Callback;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.nio.ByteBuffer;
-import java.util.*;
-import java.util.concurrent.Future;
-
-/**
- * <p>
- * OffsetStorageWriter is a buffered writer that wraps the simple 
OffsetBackingStore interface.
- * It maintains a copy of the key-value data in memory and buffers writes. It 
allows you to take
- * a snapshot, which can then be asynchronously flushed to the backing store 
while new writes
- * continue to be processed. This allows Copycat to process offset commits in 
the background
- * while continuing to process messages.
- * </p>
- * <p>
- * Copycat uses an OffsetStorage implementation to save state about the 
current progress of
- * source (import to Kafka) jobs, which may have many input partitions and 
"offsets" may not be as
- * simple as they are for Kafka partitions or files. Offset storage is not 
required for sink jobs
- * because they can use Kafka's native offset storage (or the sink data store 
can handle offset
- * storage to achieve exactly once semantics).
- * </p>
- * <p>
- * Both partitions and offsets are generic data objects. This allows different 
connectors to use
- * whatever representation they need, even arbitrarily complex records. These 
are translated
- * internally into the serialized form the OffsetBackingStore uses.
- * </p>
- * <p>
- * Note that this only provides write functionality. This is intentional to 
ensure stale data is
- * never read. Offset data should only be read during startup or 
reconfiguration of a task. By
- * always serving those requests by reading the values from the backing store, 
we ensure we never
- * accidentally use stale data. (One example of how this can occur: a task is 
processing input
- * partition A, writing offsets; reconfiguration causes partition A to be 
reassigned elsewhere;
- * reconfiguration causes partition A to be reassigned to this node, but now 
the offset data is out
- * of date). Since these offsets are created and managed by the connector 
itself, there's no way
- * for the offset management layer to know which keys are "owned" by which 
tasks at any given
- * time.
- * </p>
- * <p>
- * This class is not thread-safe. It should only be accessed from a Task's 
processing thread.
- * </p>
- */
-public class OffsetStorageWriter {
-    private static final Logger log = 
LoggerFactory.getLogger(OffsetStorageWriter.class);
-
-    private final OffsetBackingStore backingStore;
-    private final Converter keyConverter;
-    private final Converter valueConverter;
-    private final String namespace;
-    // Offset data in Copycat format
-    private Map<Map<String, Object>, Map<String, Object>> data = new 
HashMap<>();
-
-    // Not synchronized, should only be accessed by flush thread
-    private Map<Map<String, Object>, Map<String, Object>> toFlush = null;
-    // Unique ID for each flush request to handle callbacks after timeouts
-    private long currentFlushId = 0;
-
-    public OffsetStorageWriter(OffsetBackingStore backingStore,
-                               String namespace, Converter keyConverter, 
Converter valueConverter) {
-        this.backingStore = backingStore;
-        this.namespace = namespace;
-        this.keyConverter = keyConverter;
-        this.valueConverter = valueConverter;
-    }
-
-    /**
-     * Set an offset for a partition using Copycat data values
-     * @param partition the partition to store an offset for
-     * @param offset the offset
-     */
-    public synchronized void offset(Map<String, ?> partition, Map<String, ?> 
offset) {
-        data.put((Map<String, Object>) partition, (Map<String, Object>) 
offset);
-    }
-
-    private boolean flushing() {
-        return toFlush != null;
-    }
-
-    /**
-     * Performs the first step of a flush operation, snapshotting the current 
state. This does not
-     * actually initiate the flush with the underlying storage.
-     *
-     * @return true if a flush was initiated, false if no data was available
-     */
-    public synchronized boolean beginFlush() {
-        if (flushing()) {
-            log.error("Invalid call to OffsetStorageWriter flush() while 
already flushing, the "
-                    + "framework should not allow this");
-            throw new CopycatException("OffsetStorageWriter is already 
flushing");
-        }
-
-        if (data.isEmpty())
-            return false;
-
-        assert !flushing();
-        toFlush = data;
-        data = new HashMap<>();
-        return true;
-    }
-
-    /**
-     * Flush the current offsets and clear them from this writer. This is 
non-blocking: it
-     * moves the current set of offsets out of the way, serializes the data, 
and asynchronously
-     * writes the data to the backing store. If no offsets need to be written, 
the callback is
-     * still invoked, but no Future is returned.
-     *
-     * @return a Future, or null if there are no offsets to commitOffsets
-     */
-    public Future<Void> doFlush(final Callback<Void> callback) {
-        final long flushId = currentFlushId;
-
-        // Serialize
-        Map<ByteBuffer, ByteBuffer> offsetsSerialized;
-        try {
-            offsetsSerialized = new HashMap<>();
-            for (Map.Entry<Map<String, Object>, Map<String, Object>> entry : 
toFlush.entrySet()) {
-                // Offsets are specified as schemaless to the converter, using 
whatever internal schema is appropriate
-                // for that data. The only enforcement of the format is here.
-                OffsetUtils.validateFormat(entry.getKey());
-                OffsetUtils.validateFormat(entry.getValue());
-                // When serializing the key, we add in the namespace 
information so the key is [namespace, real key]
-                byte[] key = keyConverter.fromCopycatData(namespace, null, 
Arrays.asList(namespace, entry.getKey()));
-                ByteBuffer keyBuffer = (key != null) ? ByteBuffer.wrap(key) : 
null;
-                byte[] value = valueConverter.fromCopycatData(namespace, null, 
entry.getValue());
-                ByteBuffer valueBuffer = (value != null) ? 
ByteBuffer.wrap(value) : null;
-                offsetsSerialized.put(keyBuffer, valueBuffer);
-            }
-        } catch (Throwable t) {
-            // Must handle errors properly here or the writer will be left 
mid-flush forever and be
-            // unable to make progress.
-            log.error("CRITICAL: Failed to serialize offset data, making it 
impossible to commit "
-                    + "offsets under namespace {}. This likely won't recover 
unless the "
-                    + "unserializable partition or offset information is 
overwritten.", namespace);
-            log.error("Cause of serialization failure:", t);
-            callback.onCompletion(t, null);
-            return null;
-        }
-
-        // And submit the data
-        log.debug("Submitting {} entries to backing store", 
offsetsSerialized.size());
-        return backingStore.set(offsetsSerialized, new Callback<Void>() {
-            @Override
-            public void onCompletion(Throwable error, Void result) {
-                boolean isCurrent = handleFinishWrite(flushId, error, result);
-                if (isCurrent && callback != null)
-                    callback.onCompletion(error, result);
-            }
-        });
-    }
-
-    /**
-     * Cancel a flush that has been initiated by {@link #beginFlush}. This 
should not be called if
-     * {@link #doFlush} has already been invoked. It should be used if an 
operation performed
-     * between beginFlush and doFlush failed.
-     */
-    public synchronized void cancelFlush() {
-        // Verify we're still flushing data to handle a race between 
cancelFlush() calls from up the
-        // call stack and callbacks from the write request to underlying 
storage
-        if (flushing()) {
-            // Just recombine the data and place it back in the primary storage
-            toFlush.putAll(data);
-            data = toFlush;
-            currentFlushId++;
-            toFlush = null;
-        }
-    }
-
-    /**
-     * Handle completion of a write. Returns true if this callback is for the 
current flush
-     * operation, false if it's for an old operation that should now be 
ignored.
-     */
-    private synchronized boolean handleFinishWrite(long flushId, Throwable 
error, Void result) {
-        // Callbacks need to be handled carefully since the flush operation 
may have already timed
-        // out and been cancelled.
-        if (flushId != currentFlushId)
-            return false;
-
-        if (error != null) {
-            cancelFlush();
-        } else {
-            currentFlushId++;
-            toFlush = null;
-        }
-        return true;
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetUtils.java
----------------------------------------------------------------------
diff --git 
a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetUtils.java
 
b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetUtils.java
deleted file mode 100644
index 9ba7662..0000000
--- 
a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetUtils.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.storage;
-
-import org.apache.kafka.copycat.data.CopycatSchema;
-import org.apache.kafka.copycat.data.Schema;
-import org.apache.kafka.copycat.errors.DataException;
-
-import java.util.Map;
-
-public class OffsetUtils {
-    public static void validateFormat(Object offsetData) {
-        if (offsetData == null)
-            return;
-
-        if (!(offsetData instanceof Map))
-            throw new DataException("Offsets must be specified as a Map");
-        validateFormat((Map<Object, Object>) offsetData);
-    }
-
-    public static <K, V> void validateFormat(Map<K, V> offsetData) {
-        // Both keys and values for offsets may be null. For values, this is a 
useful way to delete offsets or indicate
-        // that there's not usable concept of offsets in your source system.
-        if (offsetData == null)
-            return;
-
-        for (Map.Entry<K, V> entry : offsetData.entrySet()) {
-            if (!(entry.getKey() instanceof String))
-                throw new DataException("Offsets may only use String keys");
-
-            Object value = entry.getValue();
-            if (value == null)
-                continue;
-            Schema.Type schemaType = 
CopycatSchema.schemaType(value.getClass());
-            if (!schemaType.isPrimitive())
-                throw new DataException("Offsets may only contain primitive 
types as values, but field " + entry.getKey() + " contains " + schemaType);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/Callback.java
----------------------------------------------------------------------
diff --git 
a/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/Callback.java 
b/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/Callback.java
deleted file mode 100644
index 5cf1423..0000000
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/Callback.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.util;
-
-/**
- * Generic interface for callbacks
- */
-public interface Callback<V> {
-    /**
-     * Invoked upon completion of the operation.
-     *
-     * @param error the error that caused the operation to fail, or null if no 
error occurred
-     * @param result the return value, or null if the operation failed
-     */
-    void onCompletion(Throwable error, V result);
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/ConnectorTaskId.java
----------------------------------------------------------------------
diff --git 
a/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/ConnectorTaskId.java
 
b/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/ConnectorTaskId.java
deleted file mode 100644
index d4cf824..0000000
--- 
a/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/ConnectorTaskId.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.util;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-import java.io.Serializable;
-
-/**
- * Unique ID for a single task. It includes a unique connector ID and a task 
ID that is unique within
- * the connector.
- */
-public class ConnectorTaskId implements Serializable, 
Comparable<ConnectorTaskId> {
-    private final String connector;
-    private final int task;
-
-    @JsonCreator
-    public ConnectorTaskId(@JsonProperty("connector") String connector, 
@JsonProperty("task") int task) {
-        this.connector = connector;
-        this.task = task;
-    }
-
-    @JsonProperty
-    public String connector() {
-        return connector;
-    }
-
-    @JsonProperty
-    public int task() {
-        return task;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o)
-            return true;
-        if (o == null || getClass() != o.getClass())
-            return false;
-
-        ConnectorTaskId that = (ConnectorTaskId) o;
-
-        if (task != that.task)
-            return false;
-        if (connector != null ? !connector.equals(that.connector) : 
that.connector != null)
-            return false;
-
-        return true;
-    }
-
-    @Override
-    public int hashCode() {
-        int result = connector != null ? connector.hashCode() : 0;
-        result = 31 * result + task;
-        return result;
-    }
-
-    @Override
-    public String toString() {
-        return connector + '-' + task;
-    }
-
-    @Override
-    public int compareTo(ConnectorTaskId o) {
-        int connectorCmp = connector.compareTo(o.connector);
-        if (connectorCmp != 0)
-            return connectorCmp;
-        return ((Integer) task).compareTo(o.task);
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/ConvertingFutureCallback.java
----------------------------------------------------------------------
diff --git 
a/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/ConvertingFutureCallback.java
 
b/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/ConvertingFutureCallback.java
deleted file mode 100644
index 862adf9..0000000
--- 
a/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/ConvertingFutureCallback.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.util;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-public abstract class ConvertingFutureCallback<U, T> implements Callback<U>, 
Future<T> {
-
-    private Callback<T> underlying;
-    private CountDownLatch finishedLatch;
-    private T result = null;
-    private Throwable exception = null;
-
-    public ConvertingFutureCallback(Callback<T> underlying) {
-        this.underlying = underlying;
-        this.finishedLatch = new CountDownLatch(1);
-    }
-
-    public abstract T convert(U result);
-
-    @Override
-    public void onCompletion(Throwable error, U result) {
-        this.exception = error;
-        this.result = convert(result);
-        if (underlying != null)
-            underlying.onCompletion(error, this.result);
-        finishedLatch.countDown();
-    }
-
-    @Override
-    public boolean cancel(boolean b) {
-        return false;
-    }
-
-    @Override
-    public boolean isCancelled() {
-        return false;
-    }
-
-    @Override
-    public boolean isDone() {
-        return finishedLatch.getCount() == 0;
-    }
-
-    @Override
-    public T get() throws InterruptedException, ExecutionException {
-        finishedLatch.await();
-        return result();
-    }
-
-    @Override
-    public T get(long l, TimeUnit timeUnit)
-            throws InterruptedException, ExecutionException, TimeoutException {
-        if (!finishedLatch.await(l, timeUnit))
-            throw new TimeoutException("Timed out waiting for future");
-        return result();
-    }
-
-    private T result() throws ExecutionException {
-        if (exception != null) {
-            throw new ExecutionException(exception);
-        }
-        return result;
-    }
-}
-

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/FutureCallback.java
----------------------------------------------------------------------
diff --git 
a/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/FutureCallback.java
 
b/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/FutureCallback.java
deleted file mode 100644
index 269482c..0000000
--- 
a/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/FutureCallback.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.util;
-
-public class FutureCallback<T> extends ConvertingFutureCallback<T, T> {
-
-    public FutureCallback(Callback<T> underlying) {
-        super(underlying);
-    }
-
-    public FutureCallback() {
-        super(null);
-    }
-
-    @Override
-    public T convert(T result) {
-        return result;
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/KafkaBasedLog.java
----------------------------------------------------------------------
diff --git 
a/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/KafkaBasedLog.java
 
b/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/KafkaBasedLog.java
deleted file mode 100644
index f5e72d3..0000000
--- 
a/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/KafkaBasedLog.java
+++ /dev/null
@@ -1,331 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.util;
-
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.common.errors.WakeupException;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.copycat.errors.CopycatException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.Set;
-import java.util.concurrent.Future;
-
-/**
- * <p>
- *     KafkaBasedLog provides a generic implementation of a shared, compacted 
log of records stored in Kafka that all
- *     clients need to consume and, at times, agree on their offset / that 
they have read to the end of the log.
- * </p>
- * <p>
- *     This functionality is useful for storing different types of data that 
all clients may need to agree on --
- *     offsets or config for example. This class runs a consumer in a 
background thread to continuously tail the target
- *     topic, accepts write requests which it writes to the topic using an 
internal producer, and provides some helpful
- *     utilities like checking the current log end offset and waiting until 
the current end of the log is reached.
- * </p>
- * <p>
- *     To support different use cases, this class works with either single- or 
multi-partition topics.
- * </p>
- * <p>
- *     Since this class is generic, it delegates the details of data storage 
via a callback that is invoked for each
- *     record that is consumed from the topic. The invocation of callbacks is 
guaranteed to be serialized -- if the
- *     calling class keeps track of state based on the log and only writes to 
it when consume callbacks are invoked
- *     and only reads it in {@link #readToEnd(Callback)} callbacks then no 
additional synchronization will be required.
- * </p>
- */
-public class KafkaBasedLog<K, V> {
-    private static final Logger log = 
LoggerFactory.getLogger(KafkaBasedLog.class);
-    private static final long CREATE_TOPIC_TIMEOUT_MS = 30000;
-
-    private Time time;
-    private final String topic;
-    private final Map<String, Object> producerConfigs;
-    private final Map<String, Object> consumerConfigs;
-    private final Callback<ConsumerRecord<K, V>> consumedCallback;
-    private Consumer<K, V> consumer;
-    private Producer<K, V> producer;
-
-    private Thread thread;
-    private boolean stopRequested;
-    private Queue<Callback<Void>> readLogEndOffsetCallbacks;
-
-    /**
-     * Create a new KafkaBasedLog object. This does not start reading the log 
and writing is not permitted until
-     * {@link #start()} is invoked.
-     *
-     * @param topic the topic to treat as a log
-     * @param producerConfigs configuration options to use when creating the 
internal producer. At a minimum this must
-     *                        contain compatible serializer settings for the 
generic types used on this class. Some
-     *                        setting, such as the number of acks, will be 
overridden to ensure correct behavior of this
-     *                        class.
-     * @param consumerConfigs configuration options to use when creating the 
internal consumer. At a minimum this must
-     *                        contain compatible serializer settings for the 
generic types used on this class. Some
-     *                        setting, such as the auto offset reset policy, 
will be overridden to ensure correct
-     *                        behavior of this class.
-     * @param consumedCallback callback to invoke for each {@link 
ConsumerRecord} consumed when tailing the log
-     * @param time Time interface
-     */
-    public KafkaBasedLog(String topic, Map<String, Object> producerConfigs, 
Map<String, Object> consumerConfigs,
-                         Callback<ConsumerRecord<K, V>> consumedCallback, Time 
time) {
-        this.topic = topic;
-        this.producerConfigs = producerConfigs;
-        this.consumerConfigs = consumerConfigs;
-        this.consumedCallback = consumedCallback;
-        this.stopRequested = false;
-        this.readLogEndOffsetCallbacks = new ArrayDeque<>();
-        this.time = time;
-    }
-
-    public void start() {
-        log.info("Starting KafkaBasedLog with topic " + topic);
-
-        producer = createProducer();
-        consumer = createConsumer();
-
-        List<TopicPartition> partitions = new ArrayList<>();
-
-        // Until we have admin utilities we can use to check for the existence 
of this topic and create it if it is missing,
-        // we rely on topic auto-creation
-        List<PartitionInfo> partitionInfos = null;
-        long started = time.milliseconds();
-        while (partitionInfos == null && time.milliseconds() - started < 
CREATE_TOPIC_TIMEOUT_MS) {
-            partitionInfos = consumer.partitionsFor(topic);
-            Utils.sleep(Math.min(time.milliseconds() - started, 1000));
-        }
-        if (partitionInfos == null)
-            throw new CopycatException("Could not look up partition metadata 
for offset backing store topic in" +
-                    " allotted period. This could indicate a connectivity 
issue, unavailable topic partitions, or if" +
-                    " this is your first use of the topic it may have taken 
too long to create.");
-
-        for (PartitionInfo partition : partitionInfos)
-            partitions.add(new TopicPartition(partition.topic(), 
partition.partition()));
-        consumer.assign(partitions);
-
-        readToLogEnd();
-
-        thread = new WorkThread();
-        thread.start();
-
-        log.info("Finished reading KafakBasedLog for topic " + topic);
-
-        log.info("Started KafakBasedLog for topic " + topic);
-    }
-
-    public void stop() {
-        log.info("Stopping KafkaBasedLog for topic " + topic);
-
-        synchronized (this) {
-            stopRequested = true;
-        }
-        consumer.wakeup();
-
-        try {
-            thread.join();
-        } catch (InterruptedException e) {
-            throw new CopycatException("Failed to stop KafkaBasedLog. Exiting 
without cleanly shutting " +
-                    "down it's producer and consumer.", e);
-        }
-
-        try {
-            producer.close();
-        } catch (KafkaException e) {
-            log.error("Failed to stop KafkaBasedLog producer", e);
-        }
-
-        try {
-            consumer.close();
-        } catch (KafkaException e) {
-            log.error("Failed to stop KafkaBasedLog consumer", e);
-        }
-
-        log.info("Stopped KafkaBasedLog for topic " + topic);
-    }
-
-    /**
-     * Flushes any outstanding writes and then reads to the current end of the 
log and invokes the specified callback.
-     * Note that this checks the current, offsets, reads to them, and invokes 
the callback regardless of whether
-     * additional records have been written to the log. If the caller needs to 
ensure they have truly reached the end
-     * of the log, they must ensure there are no other writers during this 
period.
-     *
-     * This waits until the end of all partitions has been reached.
-     *
-     * This method is asynchronous. If you need a synchronous version, pass an 
instance of
-     * {@link org.apache.kafka.copycat.util.FutureCallback} as the {@param 
callback} parameter and wait on it to block.
-     *
-     * @param callback the callback to invoke once the end of the log has been 
reached.
-     */
-    public void readToEnd(Callback<Void> callback) {
-        producer.flush();
-        synchronized (this) {
-            readLogEndOffsetCallbacks.add(callback);
-        }
-        consumer.wakeup();
-    }
-
-    /**
-     * Same as {@link #readToEnd(Callback)} but provides a {@link Future} 
instead of using a callback.
-     * @return the future associated with the operation
-     */
-    public Future<Void> readToEnd() {
-        FutureCallback<Void> future = new FutureCallback<>(null);
-        readToEnd(future);
-        return future;
-    }
-
-    public void send(K key, V value) {
-        send(key, value, null);
-    }
-
-    public void send(K key, V value, 
org.apache.kafka.clients.producer.Callback callback) {
-        producer.send(new ProducerRecord<>(topic, key, value), callback);
-    }
-
-
-    private Producer<K, V> createProducer() {
-        // Always require producer acks to all to ensure durable writes
-        producerConfigs.put(ProducerConfig.ACKS_CONFIG, "all");
-        return new KafkaProducer<>(producerConfigs);
-    }
-
-    private Consumer<K, V> createConsumer() {
-        // Always force reset to the beginning of the log since this class 
wants to consume all available log data
-        consumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
-        return new KafkaConsumer<>(consumerConfigs);
-    }
-
-    private void poll(long timeoutMs) {
-        try {
-            ConsumerRecords<K, V> records = consumer.poll(timeoutMs);
-            for (ConsumerRecord<K, V> record : records)
-                consumedCallback.onCompletion(null, record);
-        } catch (WakeupException e) {
-            // Expected on get() or stop(). The calling code should handle this
-            throw e;
-        } catch (KafkaException e) {
-            log.error("Error polling: " + e);
-        }
-    }
-
-    private void readToLogEnd() {
-        log.trace("Reading to end of offset log");
-
-        Set<TopicPartition> assignment = consumer.assignment();
-
-        // This approach to getting the current end offset is hacky until we 
have an API for looking these up directly
-        Map<TopicPartition, Long> offsets = new HashMap<>();
-        for (TopicPartition tp : assignment) {
-            long offset = consumer.position(tp);
-            offsets.put(tp, offset);
-            consumer.seekToEnd(tp);
-        }
-
-        Map<TopicPartition, Long> endOffsets = new HashMap<>();
-        try {
-            poll(0);
-        } finally {
-            // If there is an exception, even a possibly expected one like 
WakeupException, we need to make sure
-            // the consumers position is reset or it'll get into an 
inconsistent state.
-            for (TopicPartition tp : assignment) {
-                long startOffset = offsets.get(tp);
-                long endOffset = consumer.position(tp);
-                if (endOffset > startOffset) {
-                    endOffsets.put(tp, endOffset);
-                    consumer.seek(tp, startOffset);
-                }
-                log.trace("Reading to end of log for {}: starting offset {} to 
ending offset {}", tp, startOffset, endOffset);
-            }
-        }
-
-        while (!endOffsets.isEmpty()) {
-            poll(Integer.MAX_VALUE);
-
-            Iterator<Map.Entry<TopicPartition, Long>> it = 
endOffsets.entrySet().iterator();
-            while (it.hasNext()) {
-                Map.Entry<TopicPartition, Long> entry = it.next();
-                if (consumer.position(entry.getKey()) >= entry.getValue())
-                    it.remove();
-                else
-                    break;
-            }
-        }
-    }
-
-
-    private class WorkThread extends Thread {
-        @Override
-        public void run() {
-            try {
-                while (true) {
-                    int numCallbacks;
-                    synchronized (KafkaBasedLog.this) {
-                        if (stopRequested)
-                            break;
-                        numCallbacks = readLogEndOffsetCallbacks.size();
-                    }
-
-                    if (numCallbacks > 0) {
-                        try {
-                            readToLogEnd();
-                        } catch (WakeupException e) {
-                            // Either received another get() call and need to 
retry reading to end of log or stop() was
-                            // called. Both are handled by restarting this 
loop.
-                            continue;
-                        }
-                    }
-
-                    synchronized (KafkaBasedLog.this) {
-                        // Only invoke exactly the number of callbacks we 
found before triggering the read to log end
-                        // since it is possible for another write + readToEnd 
to sneak in in the meantime
-                        for (int i = 0; i < numCallbacks; i++) {
-                            Callback<Void> cb = 
readLogEndOffsetCallbacks.poll();
-                            cb.onCompletion(null, null);
-                        }
-                    }
-
-                    try {
-                        poll(Integer.MAX_VALUE);
-                    } catch (WakeupException e) {
-                        // See previous comment, both possible causes of this 
wakeup are handled by starting this loop again
-                        continue;
-                    }
-                }
-            } catch (Throwable t) {
-                log.error("Unexpected exception in KafkaBasedLog's work 
thread", t);
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/ShutdownableThread.java
----------------------------------------------------------------------
diff --git 
a/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/ShutdownableThread.java
 
b/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/ShutdownableThread.java
deleted file mode 100644
index 3e23f29..0000000
--- 
a/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/ShutdownableThread.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.util;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-/**
- * <p>
- * Thread class with support for triggering graceful and forcible shutdown. In 
graceful shutdown,
- * a flag is set, which the thread should detect and try to exit gracefully 
from. In forcible
- * shutdown, the thread is interrupted. These can be combined to give a thread 
a chance to exit
- * gracefully, but then force it to exit if it takes too long.
- * </p>
- * <p>
- * Implementations should override the {@link #execute} method and check 
{@link #getRunning} to
- * determine whether they should try to gracefully exit.
- * </p>
- */
-public abstract class ShutdownableThread extends Thread {
-    private static final Logger log = 
LoggerFactory.getLogger(ShutdownableThread.class);
-
-    private AtomicBoolean isRunning = new AtomicBoolean(true);
-    private CountDownLatch shutdownLatch = new CountDownLatch(1);
-
-    /**
-     * An UncaughtExceptionHandler to register on every instance of this 
class. This is useful for
-     * testing, where AssertionExceptions in the thread may not cause the test 
to fail. Since one
-     * instance is used for all threads, it must be thread-safe.
-     */
-    volatile public static UncaughtExceptionHandler funcaughtExceptionHandler 
= null;
-
-    public ShutdownableThread(String name) {
-        // The default is daemon=true so that these threads will not prevent 
shutdown. We use this
-        // default because threads that are running user code that may not 
clean up properly, even
-        // when we attempt to forcibly shut them down.
-        this(name, true);
-    }
-
-    public ShutdownableThread(String name, boolean daemon) {
-        super(name);
-        this.setDaemon(daemon);
-        if (funcaughtExceptionHandler != null)
-            this.setUncaughtExceptionHandler(funcaughtExceptionHandler);
-    }
-
-    /**
-     * Implementations should override this method with the main body for the 
thread.
-     */
-    public abstract void execute();
-
-    /**
-     * Returns true if the thread hasn't exited yet and none of the shutdown 
methods have been
-     * invoked
-     */
-    public boolean getRunning() {
-        return isRunning.get();
-    }
-
-    @Override
-    public void run() {
-        try {
-            execute();
-        } catch (Error | RuntimeException e) {
-            log.error("Thread {} exiting with uncaught exception: ", 
getName(), e);
-            throw e;
-        } finally {
-            shutdownLatch.countDown();
-        }
-    }
-
-    /**
-     * Shutdown the thread, first trying to shut down gracefully using the 
specified timeout, then
-     * forcibly interrupting the thread.
-     * @param gracefulTimeout the maximum time to wait for a graceful exit
-     * @param unit the time unit of the timeout argument
-     */
-    public void shutdown(long gracefulTimeout, TimeUnit unit)
-            throws InterruptedException {
-        boolean success = gracefulShutdown(gracefulTimeout, unit);
-        if (!success)
-            forceShutdown();
-    }
-
-    /**
-     * Attempt graceful shutdown
-     * @param timeout the maximum time to wait
-     * @param unit the time unit of the timeout argument
-     * @return true if successful, false if the timeout elapsed
-     */
-    public boolean gracefulShutdown(long timeout, TimeUnit unit) throws 
InterruptedException {
-        startGracefulShutdown();
-        return awaitShutdown(timeout, unit);
-    }
-
-    /**
-     * Start shutting down this thread gracefully, but do not block waiting 
for it to exit.
-     */
-    public void startGracefulShutdown() {
-        log.info("Starting graceful shutdown of thread {}", getName());
-        isRunning.set(false);
-    }
-
-    /**
-     * Awaits shutdown of this thread, waiting up to the timeout.
-     * @param timeout the maximum time to wait
-     * @param unit the time unit of the timeout argument
-     * @return true if successful, false if the timeout elapsed
-     * @throws InterruptedException
-     */
-    public boolean awaitShutdown(long timeout, TimeUnit unit) throws 
InterruptedException {
-        return shutdownLatch.await(timeout, unit);
-    }
-
-    /**
-     * Immediately tries to force the thread to shut down by interrupting it. 
This does not try to
-     * wait for the thread to truly exit because forcible shutdown is not 
always possible. By
-     * default, threads are marked as daemon threads so they will not prevent 
the process from
-     * exiting.
-     */
-    public void forceShutdown() throws InterruptedException {
-        log.info("Forcing shutdown of thread {}", getName());
-        isRunning.set(false);
-        interrupt();
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java
----------------------------------------------------------------------
diff --git 
a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java
 
b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java
deleted file mode 100644
index 0458054..0000000
--- 
a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java
+++ /dev/null
@@ -1,208 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.runtime;
-
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.copycat.data.Schema;
-import org.apache.kafka.copycat.data.SchemaAndValue;
-import org.apache.kafka.copycat.errors.RetriableException;
-import org.apache.kafka.copycat.runtime.standalone.StandaloneConfig;
-import org.apache.kafka.copycat.sink.SinkConnector;
-import org.apache.kafka.copycat.sink.SinkRecord;
-import org.apache.kafka.copycat.sink.SinkTask;
-import org.apache.kafka.copycat.storage.Converter;
-import org.apache.kafka.copycat.util.ConnectorTaskId;
-import org.apache.kafka.copycat.util.MockTime;
-import org.easymock.Capture;
-import org.easymock.CaptureType;
-import org.easymock.EasyMock;
-import org.easymock.IAnswer;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.api.easymock.PowerMock;
-import org.powermock.api.easymock.annotation.Mock;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(WorkerSinkTask.class)
-@PowerMockIgnore("javax.management.*")
-public class WorkerSinkTaskTest {
-    // These are fixed to keep this code simpler. In this example we assume 
byte[] raw values
-    // with mix of integer/string in Copycat
-    private static final String TOPIC = "test";
-    private static final int PARTITION = 12;
-    private static final int PARTITION2 = 13;
-    private static final long FIRST_OFFSET = 45;
-    private static final Schema KEY_SCHEMA = Schema.INT32_SCHEMA;
-    private static final int KEY = 12;
-    private static final Schema VALUE_SCHEMA = Schema.STRING_SCHEMA;
-    private static final String VALUE = "VALUE";
-    private static final byte[] RAW_KEY = "key".getBytes();
-    private static final byte[] RAW_VALUE = "value".getBytes();
-
-    private static final TopicPartition TOPIC_PARTITION = new 
TopicPartition(TOPIC, PARTITION);
-    private static final TopicPartition TOPIC_PARTITION2 = new 
TopicPartition(TOPIC, PARTITION2);
-
-    private static final Map<String, String> TASK_PROPS = new HashMap<>();
-    static {
-        TASK_PROPS.put(SinkConnector.TOPICS_CONFIG, TOPIC);
-    }
-
-
-    private ConnectorTaskId taskId = new ConnectorTaskId("job", 0);
-    private Time time;
-    private WorkerSinkTask workerTask;
-    @Mock
-    private SinkTask sinkTask;
-    private Capture<WorkerSinkTaskContext> sinkTaskContext = 
EasyMock.newCapture();
-    private WorkerConfig workerConfig;
-    @Mock
-    private Converter keyConverter;
-    @Mock
-    private Converter valueConverter;
-    @Mock
-    private WorkerSinkTaskThread workerThread;
-    @Mock
-    private KafkaConsumer<byte[], byte[]> consumer;
-    private Capture<ConsumerRebalanceListener> rebalanceListener = 
EasyMock.newCapture();
-
-    private long recordsReturned;
-
-    @Before
-    public void setUp() {
-        time = new MockTime();
-        Map<String, String> workerProps = new HashMap<>();
-        workerProps.put("key.converter", 
"org.apache.kafka.copycat.json.JsonConverter");
-        workerProps.put("value.converter", 
"org.apache.kafka.copycat.json.JsonConverter");
-        workerProps.put("internal.key.converter", 
"org.apache.kafka.copycat.json.JsonConverter");
-        workerProps.put("internal.value.converter", 
"org.apache.kafka.copycat.json.JsonConverter");
-        workerProps.put("internal.key.converter.schemas.enable", "false");
-        workerProps.put("internal.value.converter.schemas.enable", "false");
-        workerConfig = new StandaloneConfig(workerProps);
-        workerTask = PowerMock.createPartialMock(
-                WorkerSinkTask.class, new String[]{"createConsumer", 
"createWorkerThread"},
-                taskId, sinkTask, workerConfig, keyConverter, valueConverter, 
time);
-
-        recordsReturned = 0;
-    }
-
-    @Test
-    public void testPollRedelivery() throws Exception {
-        expectInitializeTask();
-
-        // If a retriable exception is thrown, we should redeliver the same 
batch, pausing the consumer in the meantime
-        expectConsumerPoll(1);
-        expectConvertMessages(1);
-        Capture<Collection<SinkRecord>> records = 
EasyMock.newCapture(CaptureType.ALL);
-        sinkTask.put(EasyMock.capture(records));
-        EasyMock.expectLastCall().andThrow(new RetriableException("retry"));
-        // Pause
-        EasyMock.expect(consumer.assignment()).andReturn(new 
HashSet<>(Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2)));
-        consumer.pause(TOPIC_PARTITION);
-        PowerMock.expectLastCall();
-        consumer.pause(TOPIC_PARTITION2);
-        PowerMock.expectLastCall();
-
-        // Retry delivery should suceed
-        expectConsumerPoll(0);
-        sinkTask.put(EasyMock.capture(records));
-        EasyMock.expectLastCall();
-        // And unpause
-        EasyMock.expect(consumer.assignment()).andReturn(new 
HashSet<>(Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2)));
-        consumer.resume(TOPIC_PARTITION);
-        PowerMock.expectLastCall();
-        consumer.resume(TOPIC_PARTITION2);
-        PowerMock.expectLastCall();
-
-        PowerMock.replayAll();
-
-        workerTask.start(TASK_PROPS);
-        workerTask.joinConsumerGroupAndStart();
-        workerTask.poll(Long.MAX_VALUE);
-        workerTask.poll(Long.MAX_VALUE);
-
-        PowerMock.verifyAll();
-    }
-
-
-    private void expectInitializeTask() throws Exception {
-        PowerMock.expectPrivate(workerTask, 
"createConsumer").andReturn(consumer);
-        PowerMock.expectPrivate(workerTask, "createWorkerThread")
-                .andReturn(workerThread);
-        workerThread.start();
-        PowerMock.expectLastCall();
-
-        consumer.subscribe(EasyMock.eq(Arrays.asList(TOPIC)), 
EasyMock.capture(rebalanceListener));
-        PowerMock.expectLastCall();
-
-        EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer(new 
IAnswer<ConsumerRecords<byte[], byte[]>>() {
-            @Override
-            public ConsumerRecords<byte[], byte[]> answer() throws Throwable {
-                
rebalanceListener.getValue().onPartitionsAssigned(Arrays.asList(TOPIC_PARTITION,
 TOPIC_PARTITION2));
-                return ConsumerRecords.empty();
-            }
-        });
-        
EasyMock.expect(consumer.position(TOPIC_PARTITION)).andReturn(FIRST_OFFSET);
-        
EasyMock.expect(consumer.position(TOPIC_PARTITION2)).andReturn(FIRST_OFFSET);
-
-        sinkTask.initialize(EasyMock.capture(sinkTaskContext));
-        PowerMock.expectLastCall();
-        sinkTask.start(TASK_PROPS);
-        PowerMock.expectLastCall();
-    }
-
-    private void expectConsumerPoll(final int numMessages) {
-        EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer(
-                new IAnswer<ConsumerRecords<byte[], byte[]>>() {
-                    @Override
-                    public ConsumerRecords<byte[], byte[]> answer() throws 
Throwable {
-                        List<ConsumerRecord<byte[], byte[]>> records = new 
ArrayList<>();
-                        for (int i = 0; i < numMessages; i++)
-                            records.add(new ConsumerRecord<>(TOPIC, PARTITION, 
FIRST_OFFSET + recordsReturned + i, RAW_KEY, RAW_VALUE));
-                        recordsReturned += numMessages;
-                        return new ConsumerRecords<>(
-                                numMessages > 0 ?
-                                        Collections.singletonMap(new 
TopicPartition(TOPIC, PARTITION), records) :
-                                        Collections.<TopicPartition, 
List<ConsumerRecord<byte[], byte[]>>>emptyMap()
-                        );
-                    }
-                });
-    }
-
-    private void expectConvertMessages(final int numMessages) {
-        EasyMock.expect(keyConverter.toCopycatData(TOPIC, 
RAW_KEY)).andReturn(new SchemaAndValue(KEY_SCHEMA, KEY)).times(numMessages);
-        EasyMock.expect(valueConverter.toCopycatData(TOPIC, 
RAW_VALUE)).andReturn(new SchemaAndValue(VALUE_SCHEMA, 
VALUE)).times(numMessages);
-    }
-}

Reply via email to