KAFKA-2366; Initial patch for Copycat

This is an initial patch implementing the basics of Copycat for KIP-26.

The intent here is to start a review of the key pieces of the core API and get 
a reasonably functional, baseline, non-distributed implementation of Copycat in 
place to get things rolling. The current patch has a number of known issues 
that need to be addressed before a final version:

* Some build-related issues. Specifically, requires some locally-installed 
dependencies (see below), ignores checkstyle for the runtime data library 
because it's lifted from Avro currently and likely won't last in its current 
form, and some Gradle task dependencies aren't quite right because I haven't 
gotten rid of the dependency on `core` (which should now be an easy patch since 
new consumer groups are in a much better state).
* This patch currently depends on some Confluent trunk code because I 
prototyped with our Avro serializers w/ schema-registry support. We need to 
figure out what we want to provide as an example built-in set of serializers. 
Unlike core Kafka where we could ignore the issue, providing only ByteArray or 
String serializers, this is pretty central to how Copycat works.
* This patch uses a hacked up version of Avro as its runtime data format. Not 
sure if we want to go through the entire API discussion just to get some basic 
code committed, so I filed KAFKA-2367 to handle that separately. The core 
connector APIs and the runtime data APIs are entirely orthogonal.
* This patch needs some updates to get aligned with recent new consumer changes 
(specifically, I'm aware of the ConcurrentModificationException issue on exit). 
More generally, the new consumer is in flux but Copycat depends on it, so there 
are likely to be some negative interactions.
* The layout feels a bit awkward to me right now because I ported it from a 
Maven layout. We don't have nearly the same level of granularity in Kafka 
currently (core and clients, plus the mostly ignored examples, log4j-appender, 
and a couple of contribs). We might want to reorganize, although keeping 
data+api separate from runtime and connector plugins is useful for minimizing 
dependencies.
* There are a variety of other things (e.g., I'm not happy with the exception 
hierarchy/how they are currently handled, TopicPartition doesn't really need to 
be duplicated unless we want Copycat entirely isolated from the Kafka APIs, 
etc), but I expect those we'll cover in the review.

Before commenting on the patch, it's probably worth reviewing 
https://issues.apache.org/jira/browse/KAFKA-2365 and 
https://issues.apache.org/jira/browse/KAFKA-2366 to get an idea of what I had 
in mind for a) what we ultimately want with all the Copycat patches and b) what 
we aim to cover in this initial patch. My hope is that we can use a WIP patch 
(after the current obvious deficiencies are addressed) while recognizing that 
we want to make iterative progress with a bunch of subsequent PRs.

Author: Ewen Cheslack-Postava <m...@ewencp.org>

Reviewers: Ismael Juma, Gwen Shapira

Closes #99 from ewencp/copycat and squashes the following commits:

a3a47a6 [Ewen Cheslack-Postava] Simplify Copycat exceptions, make them a 
subclass of KafkaException.
8c108b0 [Ewen Cheslack-Postava] Rename Coordinator to Herder to avoid confusion 
with the consumer coordinator.
7bf8075 [Ewen Cheslack-Postava] Make Copycat CLI speific to standalone mode, 
clean up some config and get rid of config storage in standalone mode.
656a003 [Ewen Cheslack-Postava] Clarify and expand the explanation of the 
Copycat Coordinator interface.
c0e5fdc [Ewen Cheslack-Postava] Merge remote-tracking branch 'origin/trunk' 
into copycat
0fa7a36 [Ewen Cheslack-Postava] Mark Copycat classes as unstable and reduce 
visibility of some classes where possible.
d55d31e [Ewen Cheslack-Postava] Reorganize Copycat code to put it all under one 
top-level directory.
b29cb2c [Ewen Cheslack-Postava] Merge remote-tracking branch 'origin/trunk' 
into copycat
d713a21 [Ewen Cheslack-Postava] Address Gwen's review comments.
6787a85 [Ewen Cheslack-Postava] Make Converter generic to match serializers 
since some serialization formats do not require a base class of Object; update 
many other classes to have generic key and value class type parameters to match 
this change.
b194c73 [Ewen Cheslack-Postava] Split Copycat converter option into two options 
for key and value.
0b5a1a0 [Ewen Cheslack-Postava] Normalize naming to use partition for both 
source and Kafka, adjusting naming in CopycatRecord classes to clearly 
differentiate.
e345142 [Ewen Cheslack-Postava] Remove Copycat reflection utils, use existing 
Utils and ConfigDef functionality from clients package.
be5c387 [Ewen Cheslack-Postava] Minor cleanup
122423e [Ewen Cheslack-Postava] Style cleanup
6ba87de [Ewen Cheslack-Postava] Remove most of the Avro-based mock runtime data 
API, only preserving enough schema functionality to support basic primitive 
types for an initial patch.
4674d13 [Ewen Cheslack-Postava] Address review comments, clean up some code 
styling.
25b5739 [Ewen Cheslack-Postava] Fix sink task offset commit concurrency issue 
by moving it to the worker thread and waking up the consumer to ensure it exits 
promptly.
0aefe21 [Ewen Cheslack-Postava] Add log4j settings for Copycat.
220e42d [Ewen Cheslack-Postava] Replace Avro serializer with JSON serializer.
1243a7c [Ewen Cheslack-Postava] Merge remote-tracking branch 'origin/trunk' 
into copycat
5a618c6 [Ewen Cheslack-Postava] Remove offset serializers, instead reusing the 
existing serializers and removing schema projection support.
e849e10 [Ewen Cheslack-Postava] Remove duplicated TopicPartition implementation.
dec1379 [Ewen Cheslack-Postava] Switch to using new consumer coordinator 
instead of manually assigning partitions. Remove dependency of copycat-runtime 
on core.
4a9b4f3 [Ewen Cheslack-Postava] Add some helpful Copycat-specific build and 
test targets that cover all Copycat packages.
31cd1ca [Ewen Cheslack-Postava] Add CLI tools for Copycat.
e14942c [Ewen Cheslack-Postava] Add Copycat file connector.
0233456 [Ewen Cheslack-Postava] Add copycat-avro and copycat-runtime
11981d2 [Ewen Cheslack-Postava] Add copycat-data and copycat-api


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f6acfb08
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f6acfb08
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f6acfb08

Branch: refs/heads/trunk
Commit: f6acfb08917946f15cb8de2fee786019124af212
Parents: c8e62c9
Author: Ewen Cheslack-Postava <m...@ewencp.org>
Authored: Fri Aug 14 16:00:51 2015 -0700
Committer: Gwen Shapira <csh...@gmail.com>
Committed: Fri Aug 14 16:00:51 2015 -0700

----------------------------------------------------------------------
 bin/copycat-standalone.sh                       |   23 +
 bin/kafka-run-class.sh                          |    8 +
 build.gradle                                    |  269 +-
 checkstyle/import-control.xml                   |   57 +
 .../org/apache/kafka/common/utils/Utils.java    |   15 +-
 config/copycat-console-sink.properties          |   19 +
 config/copycat-console-source.properties        |   19 +
 config/copycat-file-sink.properties             |   20 +
 config/copycat-file-source.properties           |   20 +
 config/copycat-log4j.properties                 |   23 +
 config/copycat-standalone.properties            |   28 +
 .../kafka/copycat/connector/Connector.java      |  117 +
 .../copycat/connector/ConnectorContext.java     |   33 +
 .../kafka/copycat/connector/CopycatRecord.java  |  103 +
 .../apache/kafka/copycat/connector/Task.java    |   49 +
 .../kafka/copycat/errors/CopycatException.java  |   40 +
 .../kafka/copycat/sink/SinkConnector.java       |   40 +
 .../apache/kafka/copycat/sink/SinkRecord.java   |   71 +
 .../org/apache/kafka/copycat/sink/SinkTask.java |   64 +
 .../kafka/copycat/sink/SinkTaskContext.java     |   59 +
 .../kafka/copycat/source/SourceConnector.java   |   29 +
 .../kafka/copycat/source/SourceRecord.java      |  103 +
 .../apache/kafka/copycat/source/SourceTask.java |   62 +
 .../kafka/copycat/source/SourceTaskContext.java |   40 +
 .../apache/kafka/copycat/storage/Converter.java |   45 +
 .../copycat/storage/OffsetStorageReader.java    |   59 +
 .../kafka/copycat/util/ConnectorUtils.java      |   66 +
 .../connector/ConnectorReconfigurationTest.java |   76 +
 .../kafka/copycat/util/ConnectorUtilsTest.java  |   67 +
 .../copycat/data/DataRuntimeException.java      |   36 +
 .../kafka/copycat/data/DataTypeException.java   |   33 +
 .../kafka/copycat/data/ObjectProperties.java    |   85 +
 .../org/apache/kafka/copycat/data/Schema.java   | 1054 ++++++++
 .../kafka/copycat/data/SchemaBuilder.java       | 2415 ++++++++++++++++++
 .../copycat/data/SchemaParseException.java      |   32 +
 .../copycat/file/FileStreamSinkConnector.java   |   62 +
 .../kafka/copycat/file/FileStreamSinkTask.java  |   79 +
 .../copycat/file/FileStreamSourceConnector.java |   70 +
 .../copycat/file/FileStreamSourceTask.java      |  176 ++
 .../file/FileStreamSinkConnectorTest.java       |   85 +
 .../copycat/file/FileStreamSinkTaskTest.java    |   67 +
 .../file/FileStreamSourceConnectorTest.java     |  104 +
 .../copycat/file/FileStreamSourceTaskTest.java  |  140 +
 .../kafka/copycat/json/JsonConverter.java       |  265 ++
 .../kafka/copycat/json/JsonDeserializer.java    |   87 +
 .../apache/kafka/copycat/json/JsonSchema.java   |  114 +
 .../kafka/copycat/json/JsonSerializer.java      |   72 +
 .../kafka/copycat/json/JsonConverterTest.java   |  173 ++
 .../kafka/copycat/cli/CopycatStandalone.java    |   87 +
 .../apache/kafka/copycat/cli/WorkerConfig.java  |  141 +
 .../kafka/copycat/runtime/ConnectorConfig.java  |   87 +
 .../apache/kafka/copycat/runtime/Copycat.java   |   94 +
 .../apache/kafka/copycat/runtime/Herder.java    |   67 +
 .../copycat/runtime/SinkTaskContextImpl.java    |   24 +
 .../runtime/SourceTaskOffsetCommitter.java      |  103 +
 .../apache/kafka/copycat/runtime/Worker.java    |  236 ++
 .../kafka/copycat/runtime/WorkerSinkTask.java   |  226 ++
 .../copycat/runtime/WorkerSinkTaskThread.java   |  112 +
 .../kafka/copycat/runtime/WorkerSourceTask.java |  310 +++
 .../kafka/copycat/runtime/WorkerTask.java       |   54 +
 .../standalone/StandaloneConnectorContext.java  |   42 +
 .../runtime/standalone/StandaloneHerder.java    |  257 ++
 .../copycat/storage/FileOffsetBackingStore.java |  111 +
 .../storage/MemoryOffsetBackingStore.java       |  113 +
 .../copycat/storage/OffsetBackingStore.java     |   74 +
 .../storage/OffsetStorageReaderImpl.java        |  114 +
 .../copycat/storage/OffsetStorageWriter.java    |  208 ++
 .../org/apache/kafka/copycat/util/Callback.java |   31 +
 .../kafka/copycat/util/ConnectorTaskId.java     |   71 +
 .../kafka/copycat/util/FutureCallback.java      |   76 +
 .../kafka/copycat/util/ShutdownableThread.java  |  145 ++
 .../copycat/runtime/WorkerSinkTaskTest.java     |  367 +++
 .../copycat/runtime/WorkerSourceTaskTest.java   |  279 ++
 .../kafka/copycat/runtime/WorkerTest.java       |  179 ++
 .../standalone/StandaloneHerderTest.java        |  186 ++
 .../storage/FileOffsetBackingStoreTest.java     |  117 +
 .../storage/OffsetStorageWriterTest.java        |  242 ++
 .../org/apache/kafka/copycat/util/MockTime.java |   49 +
 .../copycat/util/ShutdownableThreadTest.java    |   72 +
 .../TestBackgroundThreadExceptionHandler.java   |   37 +
 .../apache/kafka/copycat/util/ThreadedTest.java |   43 +
 settings.gradle                                 |    4 +-
 82 files changed, 11116 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/bin/copycat-standalone.sh
----------------------------------------------------------------------
diff --git a/bin/copycat-standalone.sh b/bin/copycat-standalone.sh
new file mode 100755
index 0000000..b219f8a
--- /dev/null
+++ b/bin/copycat-standalone.sh
@@ -0,0 +1,23 @@
+#!/bin/sh
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+base_dir=$(dirname $0)
+
+if [ "x$KAFKA_LOG4J_OPTS" = "x" ]; then
+    export 
KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:$base_dir/../config/copycat-log4j.properties"
+fi
+
+exec $(dirname $0)/kafka-run-class.sh 
org.apache.kafka.copycat.cli.CopycatStandalone "$@"

http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/bin/kafka-run-class.sh
----------------------------------------------------------------------
diff --git a/bin/kafka-run-class.sh b/bin/kafka-run-class.sh
index 2f00f68..b689b2e 100755
--- a/bin/kafka-run-class.sh
+++ b/bin/kafka-run-class.sh
@@ -66,6 +66,14 @@ do
   CLASSPATH=$CLASSPATH:$file
 done
 
+for cc_pkg in "data" "api" "runtime" "file" "json"
+do
+  for file in $base_dir/copycat/${cc_pkg}/build/libs/copycat-${cc_pkg}*.jar 
$base_dir/copycat/${cc_pkg}/build/dependant-libs/*.jar;
+  do
+    CLASSPATH=$CLASSPATH:$file
+  done
+done
+
 # classpath addition for release
 for file in $base_dir/libs/*.jar;
 do

http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 1b67e62..864427b 100644
--- a/build.gradle
+++ b/build.gradle
@@ -28,6 +28,11 @@ buildscript {
 }
 
 def slf4jlog4j='org.slf4j:slf4j-log4j12:1.7.6'
+def slf4japi="org.slf4j:slf4j-api:1.7.6"
+def junit='junit:junit:4.6'
+def easymock='org.easymock:easymock:3.3.1'
+def powermock='org.powermock:powermock-module-junit4:1.6.2'
+def powermock_easymock='org.powermock:powermock-api-easymock:1.6.2'
 
 allprojects {
   apply plugin: 'idea'
@@ -59,7 +64,7 @@ rat {
   // And some of the files that we have checked in should also be excluded 
from this check
   excludes.addAll([
     '**/.git/**',
-    'build/**',
+    '**/build/**',
     'CONTRIBUTING.md',
     'gradlew',
     'gradlew.bat',
@@ -204,20 +209,25 @@ for ( sv in ['2_10_5', '2_11_7'] ) {
   }
 }
 
-tasks.create(name: "jarAll", dependsOn: ['jar_core_2_10_5', 'jar_core_2_11_7', 
'clients:jar', 'examples:jar', 'contrib:hadoop-consumer:jar', 
'contrib:hadoop-producer:jar', 'log4j-appender:jar', 'tools:jar']) {
+def copycatPkgs = ['copycat:data', 'copycat:api', 'copycat:runtime', 
'copycat:json', 'copycat:file']
+def pkgs = ['clients', 'examples', 'contrib:hadoop-consumer', 
'contrib:hadoop-producer', 'log4j-appender', 'tools'] + copycatPkgs
+
+tasks.create(name: "jarCopycat", dependsOn: copycatPkgs.collect { it + ":jar" 
}) {}
+tasks.create(name: "jarAll", dependsOn: ['jar_core_2_10_5', 'jar_core_2_11_7'] 
+ pkgs.collect { it + ":jar" }) {
 }
 
-tasks.create(name: "srcJarAll", dependsOn: ['srcJar_2_10_5', 'srcJar_2_11_7', 
'clients:srcJar', 'examples:srcJar', 'contrib:hadoop-consumer:srcJar', 
'contrib:hadoop-producer:srcJar', 'log4j-appender:srcJar', 'tools:srcJar']) { }
+tasks.create(name: "srcJarAll", dependsOn: ['srcJar_2_10_5', 'srcJar_2_11_7'] 
+ pkgs.collect { it + ":srcJar" }) { }
 
-tasks.create(name: "docsJarAll", dependsOn: ['docsJar_2_10_5', 
'docsJar_2_11_7', 'clients:docsJar', 'examples:docsJar', 
'contrib:hadoop-consumer:docsJar', 'contrib:hadoop-producer:docsJar', 
'log4j-appender:docsJar', 'tools:docsJar']) { }
+tasks.create(name: "docsJarAll", dependsOn: ['docsJar_2_10_5', 
'docsJar_2_11_7'] + pkgs.collect { it + ":docsJar" }) { }
 
-tasks.create(name: "testAll", dependsOn: ['test_core_2_10_5', 
'test_core_2_11_7', 'clients:test', 'log4j-appender:test', 'tools:test']) {
+tasks.create(name: "testCopycat", dependsOn: copycatPkgs.collect { it + 
":test" }) {}
+tasks.create(name: "testAll", dependsOn: ['test_core_2_10_5', 
'test_core_2_11_7'] + pkgs.collect { it + ":test" }) {
 }
 
 tasks.create(name: "releaseTarGzAll", dependsOn: ['releaseTarGz_2_10_5', 
'releaseTarGz_2_11_7']) {
 }
 
-tasks.create(name: "uploadArchivesAll", dependsOn: 
['uploadCoreArchives_2_10_5', 'uploadCoreArchives_2_11_7', 
'clients:uploadArchives', 'examples:uploadArchives', 
'contrib:hadoop-consumer:uploadArchives', 
'contrib:hadoop-producer:uploadArchives', 'log4j-appender:uploadArchives', 
'tools:uploadArchives']) {
+tasks.create(name: "uploadArchivesAll", dependsOn: 
['uploadCoreArchives_2_10_5', 'uploadCoreArchives_2_11_7'] + pkgs.collect { it 
+ ":uploadArchives" }) {
 }
 
 project(':core') {
@@ -239,8 +249,8 @@ project(':core') {
       compile 'org.scala-lang.modules:scala-parser-combinators_2.11:1.0.4'
     }
 
-    testCompile 'junit:junit:4.6'
-    testCompile 'org.easymock:easymock:3.0'
+    testCompile "$junit"
+    testCompile "$easymock"
     testCompile 'org.objenesis:objenesis:1.2'
     testCompile "org.scalatest:scalatest_$baseScalaVersion:2.2.5"
 
@@ -371,11 +381,11 @@ project(':clients') {
   archivesBaseName = "kafka-clients"
 
   dependencies {
-    compile "org.slf4j:slf4j-api:1.7.6"
+    compile "$slf4japi"
     compile 'org.xerial.snappy:snappy-java:1.1.1.7'
     compile 'net.jpountz.lz4:lz4:1.2.0'
 
-    testCompile 'junit:junit:4.6'
+    testCompile "$junit"
     testRuntime "$slf4jlog4j"
   }
 
@@ -423,7 +433,7 @@ project(':tools') {
         compile 'com.fasterxml.jackson.core:jackson-databind:2.5.4'
         compile "$slf4jlog4j"
 
-        testCompile 'junit:junit:4.6'
+        testCompile "$junit"
         testCompile project(path: ':clients', configuration: 'archives')
     }
 
@@ -471,7 +481,7 @@ project(':log4j-appender') {
     compile project(':clients')
     compile "$slf4jlog4j"
 
-    testCompile 'junit:junit:4.6'
+    testCompile "$junit"
     testCompile project(path: ':clients', configuration: 'archives')
   }
 
@@ -496,3 +506,238 @@ project(':log4j-appender') {
   }
   test.dependsOn('checkstyleMain', 'checkstyleTest')
 }
+
+project(':copycat:data') {
+  apply plugin: 'checkstyle'
+  archivesBaseName = "copycat-data"
+
+  dependencies {
+    compile project(':clients')
+    compile "$slf4japi"
+
+    testCompile "$junit"
+    testRuntime "$slf4jlog4j"
+  }
+
+  task testJar(type: Jar) {
+    classifier = 'test'
+    from sourceSets.test.output
+  }
+
+  test {
+    testLogging {
+      events "passed", "skipped", "failed"
+      exceptionFormat = 'full'
+    }
+  }
+
+  javadoc {
+    include "**/org/apache/kafka/copycat/data/*"
+  }
+
+  artifacts {
+    archives testJar
+  }
+
+  configurations {
+    archives.extendsFrom (testCompile)
+  }
+
+  /* FIXME Re-enable this with KAFKA-2367 when the placeholder data API is 
replaced
+  checkstyle {
+    configFile = new File(rootDir, "checkstyle/checkstyle.xml")
+  }
+  test.dependsOn('checkstyleMain', 'checkstyleTest') */
+}
+
+project(':copycat:api') {
+  apply plugin: 'checkstyle'
+  archivesBaseName = "copycat-api"
+
+  dependencies {
+    compile project(':copycat:data')
+    compile "$slf4japi"
+
+    testCompile "$junit"
+    testRuntime "$slf4jlog4j"
+  }
+
+  task testJar(type: Jar) {
+    classifier = 'test'
+    from sourceSets.test.output
+  }
+
+  test {
+    testLogging {
+      events "passed", "skipped", "failed"
+      exceptionFormat = 'full'
+    }
+  }
+
+  javadoc {
+    include "**/org/apache/kafka/copycat/*"
+  }
+
+  artifacts {
+    archives testJar
+  }
+
+  configurations {
+    archives.extendsFrom (testCompile)
+  }
+
+  checkstyle {
+    configFile = new File(rootDir, "checkstyle/checkstyle.xml")
+  }
+  test.dependsOn('checkstyleMain', 'checkstyleTest')
+}
+
+project(':copycat:json') {
+  apply plugin: 'checkstyle'
+  archivesBaseName = "copycat-json"
+
+  dependencies {
+    compile project(':copycat:api')
+    compile "$slf4japi"
+    compile 'com.fasterxml.jackson.core:jackson-databind:2.5.4'
+
+    testCompile "$junit"
+    testCompile "$easymock"
+    testCompile "$powermock"
+    testCompile "$powermock_easymock"
+    testRuntime "$slf4jlog4j"
+  }
+
+  task testJar(type: Jar) {
+    classifier = 'test'
+    from sourceSets.test.output
+  }
+
+  test {
+    testLogging {
+      events "passed", "skipped", "failed"
+      exceptionFormat = 'full'
+    }
+  }
+
+  javadoc {
+    include "**/org/apache/kafka/copycat/*"
+  }
+
+  artifacts {
+    archives testJar
+  }
+
+  configurations {
+    archives.extendsFrom(testCompile)
+  }
+
+  checkstyle {
+    configFile = new File(rootDir, "checkstyle/checkstyle.xml")
+  }
+  test.dependsOn('checkstyleMain', 'checkstyleTest')
+
+  tasks.create(name: "copyDependantLibs", type: Copy) {
+    from (configurations.runtime) {
+      exclude('kafka-clients*')
+      exclude('copycat-*')
+    }
+    into "$buildDir/dependant-libs"
+  }
+
+  jar {
+    dependsOn copyDependantLibs
+  }
+}
+
+project(':copycat:runtime') {
+  apply plugin: 'checkstyle'
+  archivesBaseName = "copycat-runtime"
+
+  dependencies {
+    compile project(':copycat:api')
+    compile project(':clients')
+    compile "$slf4japi"
+
+    testCompile "$junit"
+    testCompile "$easymock"
+    testCompile "$powermock"
+    testCompile "$powermock_easymock"
+    testRuntime "$slf4jlog4j"
+    testRuntime project(":copycat:json")
+  }
+
+  task testJar(type: Jar) {
+    classifier = 'test'
+    from sourceSets.test.output
+  }
+
+  test {
+    testLogging {
+      events "passed", "skipped", "failed"
+      exceptionFormat = 'full'
+    }
+  }
+
+  javadoc {
+    include "**/org/apache/kafka/copycat/*"
+  }
+
+  artifacts {
+    archives testJar
+  }
+
+  configurations {
+    archives.extendsFrom(testCompile)
+  }
+
+  checkstyle {
+    configFile = new File(rootDir, "checkstyle/checkstyle.xml")
+  }
+  test.dependsOn('checkstyleMain', 'checkstyleTest')
+}
+
+project(':copycat:file') {
+  apply plugin: 'checkstyle'
+  archivesBaseName = "copycat-file"
+
+  dependencies {
+    compile project(':copycat:api')
+    compile "$slf4japi"
+
+    testCompile "$junit"
+    testCompile "$easymock"
+    testCompile "$powermock"
+    testCompile "$powermock_easymock"
+    testRuntime "$slf4jlog4j"
+  }
+
+  task testJar(type: Jar) {
+    classifier = 'test'
+    from sourceSets.test.output
+  }
+
+  test {
+    testLogging {
+      events "passed", "skipped", "failed"
+      exceptionFormat = 'full'
+    }
+  }
+
+  javadoc {
+    include "**/org/apache/kafka/copycat/*"
+  }
+
+  artifacts {
+    archives testJar
+  }
+
+  configurations {
+    archives.extendsFrom(testCompile)
+  }
+
+  checkstyle {
+    configFile = new File(rootDir, "checkstyle/checkstyle.xml")
+  }
+  test.dependsOn('checkstyleMain', 'checkstyleTest')
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 18be1bb..e3f4f84 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -108,4 +108,61 @@
                <allow pkg="org.apache.kafka" />
        </subpackage>
 
+       <subpackage name="copycat">
+               <allow pkg="org.apache.kafka.common" />
+               <allow pkg="org.apache.kafka.copycat.data" />
+               <allow pkg="org.apache.kafka.copycat.errors" />
+
+               <subpackage name="source">
+                       <allow pkg="org.apache.kafka.copycat.connector" />
+                       <allow pkg="org.apache.kafka.copycat.storage" />
+               </subpackage>
+
+               <subpackage name="sink">
+                       <allow pkg="org.apache.kafka.copycat.connector" />
+                       <allow pkg="org.apache.kafka.copycat.storage" />
+               </subpackage>
+
+               <subpackage name="runtime">
+                       <allow pkg="org.apache.kafka.copycat" />
+                       <allow pkg="org.apache.kafka.clients" />
+                       <!-- for tests -->
+                       <allow pkg="org.easymock" />
+                       <allow pkg="org.powermock" />
+               </subpackage>
+
+               <subpackage name="cli">
+                       <allow pkg="org.apache.kafka.copycat.runtime" />
+                       <allow pkg="org.apache.kafka.copycat.util" />
+                       <allow pkg="org.apache.kafka.common" />
+               </subpackage>
+
+               <subpackage name="storage">
+                       <allow pkg="org.apache.kafka.copycat" />
+                       <allow pkg="org.apache.kafka.common.serialization" />
+                       <!-- for tests -->
+                       <allow pkg="org.easymock" />
+                       <allow pkg="org.powermock" />
+               </subpackage>
+
+               <subpackage name="util">
+                       <allow pkg="org.apache.kafka.copycat" />
+               </subpackage>
+
+               <subpackage name="json">
+                       <allow pkg="com.fasterxml.jackson" />
+                       <allow pkg="org.apache.kafka.common.serialization" />
+                       <allow pkg="org.apache.kafka.common.errors" />
+                       <allow pkg="org.apache.kafka.copycat.storage" />
+               </subpackage>
+
+               <subpackage name="file">
+                       <allow pkg="org.apache.kafka.copycat" />
+                       <!-- for tests -->
+                       <allow pkg="org.easymock" />
+                       <allow pkg="org.powermock" />
+               </subpackage>
+
+       </subpackage>
+
 </import-control>

http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java 
b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index af9993c..80a914e 100755
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -271,17 +271,30 @@ public class Utils {
     /**
      * Instantiate the class
      */
-    public static Object newInstance(Class<?> c) {
+    public static <T> T newInstance(Class<T> c) {
         try {
             return c.newInstance();
         } catch (IllegalAccessException e) {
             throw new KafkaException("Could not instantiate class " + 
c.getName(), e);
         } catch (InstantiationException e) {
             throw new KafkaException("Could not instantiate class " + 
c.getName() + " Does it have a public no-argument constructor?", e);
+        } catch (NullPointerException e) {
+            throw new KafkaException("Requested class was null", e);
         }
     }
 
     /**
+     * Look up the class by name and instantiate it.
+     * @param klass class name
+     * @param base super class of the class to be instantiated
+     * @param <T>
+     * @return the new instance
+     */
+    public static <T> T newInstance(String klass, Class<T> base) throws 
ClassNotFoundException {
+        return Utils.newInstance(Class.forName(klass).asSubclass(base));
+    }
+
+    /**
      * Generates 32 bit murmur2 hash from byte array
      * @param data byte array to hash
      * @return 32 bit hash of the given array

http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/config/copycat-console-sink.properties
----------------------------------------------------------------------
diff --git a/config/copycat-console-sink.properties 
b/config/copycat-console-sink.properties
new file mode 100644
index 0000000..4cd4c33
--- /dev/null
+++ b/config/copycat-console-sink.properties
@@ -0,0 +1,19 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+name=local-console-sink
+connector.class=org.apache.kafka.copycat.file.FileStreamSinkConnector
+tasks.max=1
+topics=test
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/config/copycat-console-source.properties
----------------------------------------------------------------------
diff --git a/config/copycat-console-source.properties 
b/config/copycat-console-source.properties
new file mode 100644
index 0000000..17dbbf9
--- /dev/null
+++ b/config/copycat-console-source.properties
@@ -0,0 +1,19 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+name=local-console-source
+connector.class=org.apache.kafka.copycat.file.FileStreamSourceConnector
+tasks.max=1
+topic=test
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/config/copycat-file-sink.properties
----------------------------------------------------------------------
diff --git a/config/copycat-file-sink.properties 
b/config/copycat-file-sink.properties
new file mode 100644
index 0000000..3cc0d62
--- /dev/null
+++ b/config/copycat-file-sink.properties
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+name=local-file-sink
+connector.class=org.apache.kafka.copycat.file.FileStreamSinkConnector
+tasks.max=1
+file=test.sink.txt
+topics=test
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/config/copycat-file-source.properties
----------------------------------------------------------------------
diff --git a/config/copycat-file-source.properties 
b/config/copycat-file-source.properties
new file mode 100644
index 0000000..7512e50
--- /dev/null
+++ b/config/copycat-file-source.properties
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+name=local-file-source
+connector.class=org.apache.kafka.copycat.file.FileStreamSourceConnector
+tasks.max=1
+file=test.txt
+topic=test
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/config/copycat-log4j.properties
----------------------------------------------------------------------
diff --git a/config/copycat-log4j.properties b/config/copycat-log4j.properties
new file mode 100644
index 0000000..158daed
--- /dev/null
+++ b/config/copycat-log4j.properties
@@ -0,0 +1,23 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+log4j.rootLogger=INFO, stdout
+
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n
+
+log4j.logger.org.apache.zookeeper=ERROR
+log4j.logger.org.I0Itec.zkclient=ERROR
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/config/copycat-standalone.properties
----------------------------------------------------------------------
diff --git a/config/copycat-standalone.properties 
b/config/copycat-standalone.properties
new file mode 100644
index 0000000..cf3b268
--- /dev/null
+++ b/config/copycat-standalone.properties
@@ -0,0 +1,28 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# These are defaults. This file just demonstrates how to override some 
settings.
+bootstrap.servers=localhost:9092
+
+key.converter=org.apache.kafka.copycat.json.JsonConverter
+value.converter=org.apache.kafka.copycat.json.JsonConverter
+key.serializer=org.apache.kafka.copycat.json.JsonSerializer
+value.serializer=org.apache.kafka.copycat.json.JsonSerializer
+key.deserializer=org.apache.kafka.copycat.json.JsonDeserializer
+value.deserializer=org.apache.kafka.copycat.json.JsonDeserializer
+
+offset.storage.file.filename=/tmp/copycat.offsets
+# Flush much faster than normal, which is useful for testing/debugging
+offset.flush.interval.ms=10000

http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/api/src/main/java/org/apache/kafka/copycat/connector/Connector.java
----------------------------------------------------------------------
diff --git 
a/copycat/api/src/main/java/org/apache/kafka/copycat/connector/Connector.java 
b/copycat/api/src/main/java/org/apache/kafka/copycat/connector/Connector.java
new file mode 100644
index 0000000..2ea3c95
--- /dev/null
+++ 
b/copycat/api/src/main/java/org/apache/kafka/copycat/connector/Connector.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.connector;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * <p>
+ * Connectors manage integration of Copycat with another system, either as an 
input that ingests
+ * data into Kafka or an output that passes data to an external system. 
Implementations should
+ * not use this class directly; they should inherit from SourceConnector or 
SinkConnector.
+ * </p>
+ * <p>
+ * Connectors have two primary tasks. First, given some configuration, they 
are responsible for
+ * creating configurations for a set of {@link Task}s that split up the data 
processing. For
+ * example, a database Connector might create Tasks by dividing the set of 
tables evenly among
+ * tasks. Second, they are responsible for monitoring inputs for changes that 
require
+ * reconfiguration and notifying the Copycat runtime via the ConnectorContext. 
Continuing the
+ * previous example, the connector might periodically check for new tables and 
notify Copycat of
+ * additions and deletions. Copycat will then request new configurations and 
update the running
+ * Tasks.
+ * </p>
+ */
+@InterfaceStability.Unstable
+public abstract class Connector {
+
+    protected ConnectorContext context;
+
+    /**
+     * Initialize this connector, using the provided ConnectorContext to 
notify the runtime of
+     * input configuration changes.
+     * @param ctx context object used to interact with the Copycat runtime
+     */
+    public void initialize(ConnectorContext ctx) {
+        context = ctx;
+    }
+
+    /**
+     * <p>
+     * Initialize this connector, using the provided ConnectorContext to 
notify the runtime of
+     * input configuration changes and using the provided set of Task 
configurations.
+     * This version is only used to recover from failures.
+     * </p>
+     * <p>
+     * The default implementation ignores the provided Task configurations. 
During recovery, Copycat will request
+     * an updated set of configurations and update the running Tasks 
appropriately. However, Connectors should
+     * implement special handling of this case if it will avoid unnecessary 
changes to running Tasks.
+     * </p>
+     *
+     * @param ctx context object used to interact with the Copycat runtime
+     * @param taskConfigs existing task configurations, which may be used when 
generating new task configs to avoid
+     *                    churn in partition to task assignments
+     */
+    public void initialize(ConnectorContext ctx, List<Properties> taskConfigs) 
{
+        context = ctx;
+        // Ignore taskConfigs. May result in more churn of tasks during 
recovery if updated configs
+        // are very different, but reduces the difficulty of implementing a 
Connector
+    }
+
+    /**
+     * Start this Connector. This method will only be called on a clean 
Connector, i.e. it has
+     * either just been instantiated and initialized or {@link #stop()} has 
been invoked.
+     *
+     * @param props configuration settings
+     */
+    public abstract void start(Properties props);
+
+    /**
+     * Reconfigure this Connector. Most implementations will not override 
this, using the default
+     * implementation that calls {@link #stop()} followed by {@link 
#start(Properties)}.
+     * Implementations only need to override this if they want to handle this 
process more
+     * efficiently, e.g. without shutting down network connections to the 
external system.
+     *
+     * @param props new configuration settings
+     */
+    public void reconfigure(Properties props) {
+        stop();
+        start(props);
+    }
+
+    /**
+     * Returns the Task implementation for this Connector.
+     */
+    public abstract Class<? extends Task> getTaskClass();
+
+    /**
+     * Returns a set of configurations for Tasks based on the current 
configuration,
+     * producing at most count configurations.
+     *
+     * @param maxTasks maximum number of configurations to generate
+     * @return configurations for Tasks
+     */
+    public abstract List<Properties> getTaskConfigs(int maxTasks);
+
+    /**
+     * Stop this connector.
+     */
+    public abstract void stop();
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/api/src/main/java/org/apache/kafka/copycat/connector/ConnectorContext.java
----------------------------------------------------------------------
diff --git 
a/copycat/api/src/main/java/org/apache/kafka/copycat/connector/ConnectorContext.java
 
b/copycat/api/src/main/java/org/apache/kafka/copycat/connector/ConnectorContext.java
new file mode 100644
index 0000000..ecba69a
--- /dev/null
+++ 
b/copycat/api/src/main/java/org/apache/kafka/copycat/connector/ConnectorContext.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.connector;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+/**
+ * ConnectorContext allows Connectors to proactively interact with the Copycat 
runtime.
+ */
+@InterfaceStability.Unstable
+public interface ConnectorContext {
+    /**
+     * Requests that the runtime reconfigure the Tasks for this source. This 
should be used to
+     * indicate to the runtime that something about the input/output has 
changed (e.g. partitions
+     * added/removed) and the running Tasks will need to be modified.
+     */
+    void requestTaskReconfiguration();
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/api/src/main/java/org/apache/kafka/copycat/connector/CopycatRecord.java
----------------------------------------------------------------------
diff --git 
a/copycat/api/src/main/java/org/apache/kafka/copycat/connector/CopycatRecord.java
 
b/copycat/api/src/main/java/org/apache/kafka/copycat/connector/CopycatRecord.java
new file mode 100644
index 0000000..576904a
--- /dev/null
+++ 
b/copycat/api/src/main/java/org/apache/kafka/copycat/connector/CopycatRecord.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.connector;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+/**
+ * <p>
+ * Base class for records containing data to be copied to/from Kafka. This 
corresponds closely to
+ * Kafka's ProducerRecord and ConsumerRecord classes, and holds the data that 
may be used by both
+ * sources and sinks (topic, kafkaPartition, key, value). Although both 
implementations include a
+ * notion of offset, it is not included here because they differ in type.
+ * </p>
+ */
+@InterfaceStability.Unstable
+public abstract class CopycatRecord {
+    private final String topic;
+    private final Integer kafkaPartition;
+    private final Object key;
+    private final Object value;
+
+    public CopycatRecord(String topic, Integer kafkaPartition, Object value) {
+        this(topic, kafkaPartition, null, value);
+    }
+
+    public CopycatRecord(String topic, Integer kafkaPartition, Object key, 
Object value) {
+        this.topic = topic;
+        this.kafkaPartition = kafkaPartition;
+        this.key = key;
+        this.value = value;
+    }
+
+    public String getTopic() {
+        return topic;
+    }
+
+    public Integer getKafkaPartition() {
+        return kafkaPartition;
+    }
+
+    public Object getKey() {
+        return key;
+    }
+
+    public Object getValue() {
+        return value;
+    }
+
+    @Override
+    public String toString() {
+        return "CopycatRecord{" +
+                "topic='" + topic + '\'' +
+                ", kafkaPartition=" + kafkaPartition +
+                ", key=" + key +
+                ", value=" + value +
+                '}';
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o)
+            return true;
+        if (o == null || getClass() != o.getClass())
+            return false;
+
+        CopycatRecord that = (CopycatRecord) o;
+
+        if (key != null ? !key.equals(that.key) : that.key != null)
+            return false;
+        if (kafkaPartition != null ? 
!kafkaPartition.equals(that.kafkaPartition) : that.kafkaPartition != null)
+            return false;
+        if (topic != null ? !topic.equals(that.topic) : that.topic != null)
+            return false;
+        if (value != null ? !value.equals(that.value) : that.value != null)
+            return false;
+
+        return true;
+    }
+
+    @Override
+    public int hashCode() {
+        int result = topic != null ? topic.hashCode() : 0;
+        result = 31 * result + (kafkaPartition != null ? 
kafkaPartition.hashCode() : 0);
+        result = 31 * result + (key != null ? key.hashCode() : 0);
+        result = 31 * result + (value != null ? value.hashCode() : 0);
+        return result;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/api/src/main/java/org/apache/kafka/copycat/connector/Task.java
----------------------------------------------------------------------
diff --git 
a/copycat/api/src/main/java/org/apache/kafka/copycat/connector/Task.java 
b/copycat/api/src/main/java/org/apache/kafka/copycat/connector/Task.java
new file mode 100644
index 0000000..cdaba08
--- /dev/null
+++ b/copycat/api/src/main/java/org/apache/kafka/copycat/connector/Task.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.connector;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Properties;
+
+/**
+ * <p>
+ * Tasks contain the code that actually copies data to/from another system. 
They receive
+ * a configuration from their parent Connector, assigning them a fraction of a 
Copycat job's work.
+ * The Copycat framework then pushes/pulls data from the Task. The Task must 
also be able to
+ * respond to reconfiguration requests.
+ * </p>
+ * <p>
+ * Task only contains the minimal shared functionality between
+ * {@link org.apache.kafka.copycat.source.SourceTask} and
+ * {@link org.apache.kafka.copycat.sink.SinkTask}.
+ * </p>
+ */
+@InterfaceStability.Unstable
+public interface Task {
+    /**
+     * Start the Task
+     * @param props initial configuration
+     */
+    void start(Properties props);
+
+    /**
+     * Stop this task.
+     */
+    void stop();
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/api/src/main/java/org/apache/kafka/copycat/errors/CopycatException.java
----------------------------------------------------------------------
diff --git 
a/copycat/api/src/main/java/org/apache/kafka/copycat/errors/CopycatException.java
 
b/copycat/api/src/main/java/org/apache/kafka/copycat/errors/CopycatException.java
new file mode 100644
index 0000000..c8f1bad
--- /dev/null
+++ 
b/copycat/api/src/main/java/org/apache/kafka/copycat/errors/CopycatException.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.errors;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+/**
+ * CopycatException is the top-level exception type generated by Copycat and 
connectors.
+ */
+@InterfaceStability.Unstable
+public class CopycatException extends KafkaException {
+
+    public CopycatException(String s) {
+        super(s);
+    }
+
+    public CopycatException(String s, Throwable throwable) {
+        super(s, throwable);
+    }
+
+    public CopycatException(Throwable throwable) {
+        super(throwable);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkConnector.java
----------------------------------------------------------------------
diff --git 
a/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkConnector.java 
b/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkConnector.java
new file mode 100644
index 0000000..fb2e694
--- /dev/null
+++ b/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkConnector.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.sink;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.copycat.connector.Connector;
+
+/**
+ * SinkConnectors implement the Connector interface to send Kafka data to 
another system.
+ */
+@InterfaceStability.Unstable
+public abstract class SinkConnector extends Connector {
+
+    /**
+     * <p>
+     * Configuration key for the list of input topics for this connector.
+     * </p>
+     * <p>
+     * Usually this setting is only relevant to the Copycat framework, but is 
provided here for
+     * the convenience of Connector developers if they also need to know the 
set of topics.
+     * </p>
+     */
+    public static final String TOPICS_CONFIG = "topics";
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkRecord.java
----------------------------------------------------------------------
diff --git 
a/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkRecord.java 
b/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkRecord.java
new file mode 100644
index 0000000..e3775b3
--- /dev/null
+++ b/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkRecord.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.sink;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.copycat.connector.CopycatRecord;
+
+/**
+ * SinkRecord is a CopycatRecord that has been read from Kafka and includes 
the kafkaOffset of
+ * the record in the Kafka topic-partition in addition to the standard fields. 
This information
+ * should be used by the SinkTask to coordinate kafkaOffset commits.
+ */
+@InterfaceStability.Unstable
+public class SinkRecord extends CopycatRecord {
+    private final long kafkaOffset;
+
+    public SinkRecord(String topic, int partition, Object key, Object value, 
long kafkaOffset) {
+        super(topic, partition, key, value);
+        this.kafkaOffset = kafkaOffset;
+    }
+
+    public long getKafkaOffset() {
+        return kafkaOffset;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o)
+            return true;
+        if (o == null || getClass() != o.getClass())
+            return false;
+        if (!super.equals(o))
+            return false;
+
+        SinkRecord that = (SinkRecord) o;
+
+        if (kafkaOffset != that.kafkaOffset)
+            return false;
+
+        return true;
+    }
+
+    @Override
+    public int hashCode() {
+        int result = super.hashCode();
+        result = 31 * result + (int) (kafkaOffset ^ (kafkaOffset >>> 32));
+        return result;
+    }
+
+    @Override
+    public String toString() {
+        return "SinkRecord{" +
+                "kafkaOffset=" + kafkaOffset +
+                "} " + super.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTask.java
----------------------------------------------------------------------
diff --git 
a/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTask.java 
b/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTask.java
new file mode 100644
index 0000000..49fbbe9
--- /dev/null
+++ b/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTask.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.copycat.sink;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.copycat.connector.Task;
+
+import java.util.Collection;
+import java.util.Map;
+
+/**
+ * SinkTask is a Task takes records loaded from Kafka and sends them to 
another system. In
+ * addition to the basic {@link #put} interface, SinkTasks must also implement 
{@link #flush}
+ * to support offset commits.
+ */
+@InterfaceStability.Unstable
+public abstract class SinkTask implements Task {
+
+    /**
+     * <p>
+     * The configuration key that provides the list of topics that are inputs 
for this
+     * SinkTask.
+     * </p>
+     */
+    public static final String TOPICS_CONFIG = "topics";
+
+    protected SinkTaskContext context;
+
+    public void initialize(SinkTaskContext context) {
+        this.context = context;
+    }
+
+    /**
+     * Put the records in the sink. Usually this should send the records to 
the sink asynchronously
+     * and immediately return.
+     *
+     * @param records the set of records to send
+     */
+    public abstract void put(Collection<SinkRecord> records);
+
+    /**
+     * Flush all records that have been {@link #put} for the specified 
topic-partitions. The
+     * offsets are provided for convenience, but could also be determined by 
tracking all offsets
+     * included in the SinkRecords passed to {@link #put}.
+     *
+     * @param offsets mapping of TopicPartition to committed offset
+     */
+    public abstract void flush(Map<TopicPartition, Long> offsets);
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTaskContext.java
----------------------------------------------------------------------
diff --git 
a/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTaskContext.java 
b/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTaskContext.java
new file mode 100644
index 0000000..7cc6109
--- /dev/null
+++ 
b/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTaskContext.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.sink;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Context passed to SinkTasks, allowing them to access utilities in the 
copycat runtime.
+ */
+@InterfaceStability.Unstable
+public abstract class SinkTaskContext {
+    private Map<TopicPartition, Long> offsets;
+
+    public SinkTaskContext() {
+        offsets = new HashMap<>();
+    }
+
+    /**
+     * Reset the consumer offsets for the given topic partitions. SinkTasks 
should use this when they are started
+     * if they manage offsets in the sink data store rather than using Kafka 
consumer offsets. For example, an HDFS
+     * connector might record offsets in HDFS to provide exactly once 
delivery. When the SinkTask is started or
+     * a rebalance occurs, the task would reload offsets from HDFS and use 
this method to reset the consumer to those
+     * offsets.
+     *
+     * SinkTasks that do not manage their own offsets do not need to use this 
method.
+     *
+     * @param offsets map of offsets for topic partitions
+     */
+    public void resetOffset(Map<TopicPartition, Long> offsets) {
+        this.offsets = offsets;
+    }
+
+    /**
+     * Get offsets that the SinkTask has submitted to be reset. Used by the 
Copycat framework.
+     * @return the map of offsets
+     */
+    public Map<TopicPartition, Long> getOffsets() {
+        return offsets;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceConnector.java
----------------------------------------------------------------------
diff --git 
a/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceConnector.java
 
b/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceConnector.java
new file mode 100644
index 0000000..7258cdf
--- /dev/null
+++ 
b/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceConnector.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.copycat.source;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.copycat.connector.Connector;
+
+/**
+ * SourceConnectors implement the connector interface to pull data from 
another system and send
+ * it to Kafka.
+ */
+@InterfaceStability.Unstable
+public abstract class SourceConnector extends Connector {
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceRecord.java
----------------------------------------------------------------------
diff --git 
a/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceRecord.java 
b/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceRecord.java
new file mode 100644
index 0000000..2085f66
--- /dev/null
+++ 
b/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceRecord.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.source;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.copycat.connector.CopycatRecord;
+
+/**
+ * <p>
+ * SourceRecords are generated by SourceTasks and passed to Copycat for 
storage in
+ * Kafka. In addition to the standard fields in CopycatRecord which specify 
where data is stored
+ * in Kafka, they also include a sourcePartition and sourceOffset.
+ * </p>
+ * <p>
+ * The sourcePartition represents a single input sourcePartition that the 
record came from (e.g. a filename, table
+ * name, or topic-partition). The sourceOffset represents a position in that 
sourcePartition which can be used
+ * to resume consumption of data.
+ * </p>
+ * <p>
+ * These values can have arbitrary structure and should be represented using
+ * org.apache.kafka.copycat.data objects (or primitive values). For example, a 
database connector
+ * might specify the sourcePartition as a record containing { "db": 
"database_name", "table":
+ * "table_name"} and the sourceOffset as a Long containing the timestamp of 
the row.
+ * </p>
+ */
+@InterfaceStability.Unstable
+public class SourceRecord extends CopycatRecord {
+    private final Object sourcePartition;
+    private final Object sourceOffset;
+
+    public SourceRecord(Object sourcePartition, Object sourceOffset, String 
topic, Integer partition, Object value) {
+        this(sourcePartition, sourceOffset, topic, partition, null, value);
+    }
+
+    public SourceRecord(Object sourcePartition, Object sourceOffset, String 
topic, Object value) {
+        this(sourcePartition, sourceOffset, topic, null, null, value);
+    }
+
+    public SourceRecord(Object sourcePartition, Object sourceOffset, String 
topic, Integer partition,
+                        Object key, Object value) {
+        super(topic, partition, key, value);
+        this.sourcePartition = sourcePartition;
+        this.sourceOffset = sourceOffset;
+    }
+
+    public Object getSourcePartition() {
+        return sourcePartition;
+    }
+
+    public Object getSourceOffset() {
+        return sourceOffset;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o)
+            return true;
+        if (o == null || getClass() != o.getClass())
+            return false;
+        if (!super.equals(o))
+            return false;
+
+        SourceRecord that = (SourceRecord) o;
+
+        if (sourceOffset != null ? !sourceOffset.equals(that.sourceOffset) : 
that.sourceOffset != null)
+            return false;
+        if (sourcePartition != null ? 
!sourcePartition.equals(that.sourcePartition) : that.sourcePartition != null)
+            return false;
+
+        return true;
+    }
+
+    @Override
+    public int hashCode() {
+        int result = super.hashCode();
+        result = 31 * result + (sourcePartition != null ? 
sourcePartition.hashCode() : 0);
+        result = 31 * result + (sourceOffset != null ? sourceOffset.hashCode() 
: 0);
+        return result;
+    }
+
+    @Override
+    public String toString() {
+        return "SourceRecord{" +
+                "sourcePartition=" + sourcePartition +
+                ", sourceOffset=" + sourceOffset +
+                "} " + super.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceTask.java
----------------------------------------------------------------------
diff --git 
a/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceTask.java 
b/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceTask.java
new file mode 100644
index 0000000..1e1da34
--- /dev/null
+++ b/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceTask.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.source;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.copycat.connector.Task;
+
+import java.util.List;
+
+/**
+ * SourceTask is a Task that pulls records from another system for storage in 
Kafka.
+ */
+@InterfaceStability.Unstable
+public abstract class SourceTask implements Task {
+
+    protected SourceTaskContext context;
+
+    /**
+     * Initialize this SourceTask with the specified context object.
+     */
+    public void initialize(SourceTaskContext context) {
+        this.context = context;
+    }
+
+    /**
+     * Poll this SourceTask for new records. This method should block if no 
data is currently
+     * available.
+     *
+     * @return a list of source records
+     */
+    public abstract List<SourceRecord> poll() throws InterruptedException;
+
+    /**
+     * <p>
+     * Commit the offsets, up to the offsets that have been returned by {@link 
#poll()}. This
+     * method should block until the commit is complete.
+     * </p>
+     * <p>
+     * SourceTasks are not required to implement this functionality; Copycat 
will record offsets
+     * automatically. This hook is provided for systems that also need to 
store offsets internally
+     * in their own system.
+     * </p>
+     */
+    public void commit() throws InterruptedException {
+        // This space intentionally left blank.
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceTaskContext.java
----------------------------------------------------------------------
diff --git 
a/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceTaskContext.java
 
b/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceTaskContext.java
new file mode 100644
index 0000000..d52fd62
--- /dev/null
+++ 
b/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceTaskContext.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.copycat.source;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.copycat.storage.OffsetStorageReader;
+
+/**
+ * SourceTaskContext is provided to SourceTasks to allow them to interact with 
the underlying
+ * runtime.
+ */
+@InterfaceStability.Unstable
+public class SourceTaskContext {
+    private final OffsetStorageReader reader;
+
+    public SourceTaskContext(OffsetStorageReader reader) {
+        this.reader = reader;
+    }
+
+    /**
+     * Get the OffsetStorageReader for this SourceTask.
+     */
+    public OffsetStorageReader getOffsetStorageReader() {
+        return reader;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/api/src/main/java/org/apache/kafka/copycat/storage/Converter.java
----------------------------------------------------------------------
diff --git 
a/copycat/api/src/main/java/org/apache/kafka/copycat/storage/Converter.java 
b/copycat/api/src/main/java/org/apache/kafka/copycat/storage/Converter.java
new file mode 100644
index 0000000..c50aee7
--- /dev/null
+++ b/copycat/api/src/main/java/org/apache/kafka/copycat/storage/Converter.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.storage;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+/**
+ * The Converter interface provides support for translating between Copycat's 
runtime data format
+ * and the "native" runtime format used by the serialization layer. This is 
used to translate
+ * two types of data: records and offsets. The (de)serialization is performed 
by a separate
+ * component -- the producer or consumer serializer or deserializer for 
records or a Copycat
+ * serializer or deserializer for offsets.
+ */
+@InterfaceStability.Unstable
+public interface Converter<T> {
+
+    /**
+     * Convert a Copycat data object to a native object for serialization.
+     * @param value
+     * @return
+     */
+    T fromCopycatData(Object value);
+
+    /**
+     * Convert a native object to a Copycat data object.
+     * @param value
+     * @return
+     */
+    Object toCopycatData(T value);
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/api/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReader.java
----------------------------------------------------------------------
diff --git 
a/copycat/api/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReader.java
 
b/copycat/api/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReader.java
new file mode 100644
index 0000000..785660d
--- /dev/null
+++ 
b/copycat/api/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReader.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.storage;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Collection;
+import java.util.Map;
+
+/**
+ * OffsetStorageReader provides access to the offset storage used by sources. 
This can be used by
+ * connectors to determine offsets to start consuming data from. This is most 
commonly used during
+ * initialization of a task, but can also be used during runtime, e.g. when 
reconfiguring a task.
+ */
+@InterfaceStability.Unstable
+public interface OffsetStorageReader {
+    /**
+     * Get the offset for the specified partition. If the data isn't already 
available locally, this
+     * gets it from the backing store, which may require some network round 
trips.
+     *
+     * @param partition object uniquely identifying the partition of data
+     * @return object uniquely identifying the offset in the partition of data
+     */
+    Object getOffset(Object partition);
+
+    /**
+     * <p>
+     * Get a set of offsets for the specified partition identifiers. This may 
be more efficient
+     * than calling {@link #getOffset(Object)} repeatedly.
+     * </p>
+     * <p>
+     * Note that when errors occur, this method omits the associated data and 
tries to return as
+     * many of the requested values as possible. This allows a task that's 
managing many partitions to
+     * still proceed with any available data. Therefore, implementations 
should take care to check
+     * that the data is actually available in the returned response. The only 
case when an
+     * exception will be thrown is if the entire request failed, e.g. because 
the underlying
+     * storage was unavailable.
+     * </p>
+     *
+     * @param partitions set of identifiers for partitions of data
+     * @return a map of partition identifiers to decoded offsets
+     */
+    Map<Object, Object> getOffsets(Collection<Object> partitions);
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/api/src/main/java/org/apache/kafka/copycat/util/ConnectorUtils.java
----------------------------------------------------------------------
diff --git 
a/copycat/api/src/main/java/org/apache/kafka/copycat/util/ConnectorUtils.java 
b/copycat/api/src/main/java/org/apache/kafka/copycat/util/ConnectorUtils.java
new file mode 100644
index 0000000..f9dd53a
--- /dev/null
+++ 
b/copycat/api/src/main/java/org/apache/kafka/copycat/util/ConnectorUtils.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.util;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Utilities that connector implementations might find useful. Contains common 
building blocks
+ * for writing connectors.
+ */
+@InterfaceStability.Unstable
+public class ConnectorUtils {
+    /**
+     * Given a list of elements and a target number of groups, generates list 
of groups of
+     * elements to match the target number of groups, spreading them evenly 
among the groups.
+     * This generates groups with contiguous elements, which results in 
intuitive ordering if
+     * your elements are also ordered (e.g. alphabetical lists of table names 
if you sort
+     * table names alphabetically to generate the raw partitions) or can 
result in efficient
+     * partitioning if elements are sorted according to some criteria that 
affects performance
+     * (e.g. topic partitions with the same leader).
+     *
+     * @param elements list of elements to partition
+     * @param numGroups the number of output groups to generate.
+     */
+    public static <T> List<List<T>> groupPartitions(List<T> elements, int 
numGroups) {
+        if (numGroups <= 0)
+            throw new IllegalArgumentException("Number of groups must be 
positive.");
+
+        List<List<T>> result = new ArrayList<>(numGroups);
+
+        // Each group has either n+1 or n raw partitions
+        int perGroup = elements.size() / numGroups;
+        int leftover = elements.size() - (numGroups * perGroup);
+
+        int assigned = 0;
+        for (int group = 0; group < numGroups; group++) {
+            int numThisGroup = group < leftover ? perGroup + 1 : perGroup;
+            List<T> groupList = new ArrayList<>(numThisGroup);
+            for (int i = 0; i < numThisGroup; i++) {
+                groupList.add(elements.get(assigned));
+                assigned++;
+            }
+            result.add(groupList);
+        }
+
+        return result;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/api/src/test/java/org/apache/kafka/copycat/connector/ConnectorReconfigurationTest.java
----------------------------------------------------------------------
diff --git 
a/copycat/api/src/test/java/org/apache/kafka/copycat/connector/ConnectorReconfigurationTest.java
 
b/copycat/api/src/test/java/org/apache/kafka/copycat/connector/ConnectorReconfigurationTest.java
new file mode 100644
index 0000000..e7ad2f3
--- /dev/null
+++ 
b/copycat/api/src/test/java/org/apache/kafka/copycat/connector/ConnectorReconfigurationTest.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.connector;
+
+import org.apache.kafka.copycat.errors.CopycatException;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+
+public class ConnectorReconfigurationTest {
+
+    @Test
+    public void testDefaultReconfigure() throws Exception {
+        TestConnector conn = new TestConnector(false);
+        conn.reconfigure(new Properties());
+        assertEquals(conn.stopOrder, 0);
+        assertEquals(conn.configureOrder, 1);
+    }
+
+    @Test(expected = CopycatException.class)
+    public void testReconfigureStopException() throws Exception {
+        TestConnector conn = new TestConnector(true);
+        conn.reconfigure(new Properties());
+    }
+
+    private static class TestConnector extends Connector {
+        private boolean stopException;
+        private int order = 0;
+        public int stopOrder = -1;
+        public int configureOrder = -1;
+
+        public TestConnector(boolean stopException) {
+            this.stopException = stopException;
+        }
+
+        @Override
+        public void start(Properties props) {
+            configureOrder = order++;
+        }
+
+        @Override
+        public Class<? extends Task> getTaskClass() {
+            return null;
+        }
+
+        @Override
+        public List<Properties> getTaskConfigs(int count) {
+            return null;
+        }
+
+        @Override
+        public void stop() {
+            stopOrder = order++;
+            if (stopException)
+                throw new CopycatException("error");
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/api/src/test/java/org/apache/kafka/copycat/util/ConnectorUtilsTest.java
----------------------------------------------------------------------
diff --git 
a/copycat/api/src/test/java/org/apache/kafka/copycat/util/ConnectorUtilsTest.java
 
b/copycat/api/src/test/java/org/apache/kafka/copycat/util/ConnectorUtilsTest.java
new file mode 100644
index 0000000..e46967b
--- /dev/null
+++ 
b/copycat/api/src/test/java/org/apache/kafka/copycat/util/ConnectorUtilsTest.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.util;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+public class ConnectorUtilsTest {
+
+    private static final List<Integer> FIVE_ELEMENTS = Arrays.asList(1, 2, 3, 
4, 5);
+
+    @Test
+    public void testGroupPartitions() {
+
+        List<List<Integer>> grouped = 
ConnectorUtils.groupPartitions(FIVE_ELEMENTS, 1);
+        assertEquals(Arrays.asList(FIVE_ELEMENTS), grouped);
+
+        grouped = ConnectorUtils.groupPartitions(FIVE_ELEMENTS, 2);
+        assertEquals(Arrays.asList(Arrays.asList(1, 2, 3), Arrays.asList(4, 
5)), grouped);
+
+        grouped = ConnectorUtils.groupPartitions(FIVE_ELEMENTS, 3);
+        assertEquals(Arrays.asList(Arrays.asList(1, 2),
+                Arrays.asList(3, 4),
+                Arrays.asList(5)), grouped);
+
+        grouped = ConnectorUtils.groupPartitions(FIVE_ELEMENTS, 5);
+        assertEquals(Arrays.asList(Arrays.asList(1),
+                Arrays.asList(2),
+                Arrays.asList(3),
+                Arrays.asList(4),
+                Arrays.asList(5)), grouped);
+
+        grouped = ConnectorUtils.groupPartitions(FIVE_ELEMENTS, 7);
+        assertEquals(Arrays.asList(Arrays.asList(1),
+                Arrays.asList(2),
+                Arrays.asList(3),
+                Arrays.asList(4),
+                Arrays.asList(5),
+                Collections.EMPTY_LIST,
+                Collections.EMPTY_LIST), grouped);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testGroupPartitionsInvalidCount() {
+        ConnectorUtils.groupPartitions(FIVE_ELEMENTS, 0);
+    }
+}

Reply via email to