KAFKA-2366; Initial patch for Copycat This is an initial patch implementing the basics of Copycat for KIP-26.
The intent here is to start a review of the key pieces of the core API and get a reasonably functional, baseline, non-distributed implementation of Copycat in place to get things rolling. The current patch has a number of known issues that need to be addressed before a final version: * Some build-related issues. Specifically, requires some locally-installed dependencies (see below), ignores checkstyle for the runtime data library because it's lifted from Avro currently and likely won't last in its current form, and some Gradle task dependencies aren't quite right because I haven't gotten rid of the dependency on `core` (which should now be an easy patch since new consumer groups are in a much better state). * This patch currently depends on some Confluent trunk code because I prototyped with our Avro serializers w/ schema-registry support. We need to figure out what we want to provide as an example built-in set of serializers. Unlike core Kafka where we could ignore the issue, providing only ByteArray or String serializers, this is pretty central to how Copycat works. * This patch uses a hacked up version of Avro as its runtime data format. Not sure if we want to go through the entire API discussion just to get some basic code committed, so I filed KAFKA-2367 to handle that separately. The core connector APIs and the runtime data APIs are entirely orthogonal. * This patch needs some updates to get aligned with recent new consumer changes (specifically, I'm aware of the ConcurrentModificationException issue on exit). More generally, the new consumer is in flux but Copycat depends on it, so there are likely to be some negative interactions. * The layout feels a bit awkward to me right now because I ported it from a Maven layout. We don't have nearly the same level of granularity in Kafka currently (core and clients, plus the mostly ignored examples, log4j-appender, and a couple of contribs). We might want to reorganize, although keeping data+api separate from runtime and connector plugins is useful for minimizing dependencies. * There are a variety of other things (e.g., I'm not happy with the exception hierarchy/how they are currently handled, TopicPartition doesn't really need to be duplicated unless we want Copycat entirely isolated from the Kafka APIs, etc), but I expect those we'll cover in the review. Before commenting on the patch, it's probably worth reviewing https://issues.apache.org/jira/browse/KAFKA-2365 and https://issues.apache.org/jira/browse/KAFKA-2366 to get an idea of what I had in mind for a) what we ultimately want with all the Copycat patches and b) what we aim to cover in this initial patch. My hope is that we can use a WIP patch (after the current obvious deficiencies are addressed) while recognizing that we want to make iterative progress with a bunch of subsequent PRs. Author: Ewen Cheslack-Postava <m...@ewencp.org> Reviewers: Ismael Juma, Gwen Shapira Closes #99 from ewencp/copycat and squashes the following commits: a3a47a6 [Ewen Cheslack-Postava] Simplify Copycat exceptions, make them a subclass of KafkaException. 8c108b0 [Ewen Cheslack-Postava] Rename Coordinator to Herder to avoid confusion with the consumer coordinator. 7bf8075 [Ewen Cheslack-Postava] Make Copycat CLI speific to standalone mode, clean up some config and get rid of config storage in standalone mode. 656a003 [Ewen Cheslack-Postava] Clarify and expand the explanation of the Copycat Coordinator interface. c0e5fdc [Ewen Cheslack-Postava] Merge remote-tracking branch 'origin/trunk' into copycat 0fa7a36 [Ewen Cheslack-Postava] Mark Copycat classes as unstable and reduce visibility of some classes where possible. d55d31e [Ewen Cheslack-Postava] Reorganize Copycat code to put it all under one top-level directory. b29cb2c [Ewen Cheslack-Postava] Merge remote-tracking branch 'origin/trunk' into copycat d713a21 [Ewen Cheslack-Postava] Address Gwen's review comments. 6787a85 [Ewen Cheslack-Postava] Make Converter generic to match serializers since some serialization formats do not require a base class of Object; update many other classes to have generic key and value class type parameters to match this change. b194c73 [Ewen Cheslack-Postava] Split Copycat converter option into two options for key and value. 0b5a1a0 [Ewen Cheslack-Postava] Normalize naming to use partition for both source and Kafka, adjusting naming in CopycatRecord classes to clearly differentiate. e345142 [Ewen Cheslack-Postava] Remove Copycat reflection utils, use existing Utils and ConfigDef functionality from clients package. be5c387 [Ewen Cheslack-Postava] Minor cleanup 122423e [Ewen Cheslack-Postava] Style cleanup 6ba87de [Ewen Cheslack-Postava] Remove most of the Avro-based mock runtime data API, only preserving enough schema functionality to support basic primitive types for an initial patch. 4674d13 [Ewen Cheslack-Postava] Address review comments, clean up some code styling. 25b5739 [Ewen Cheslack-Postava] Fix sink task offset commit concurrency issue by moving it to the worker thread and waking up the consumer to ensure it exits promptly. 0aefe21 [Ewen Cheslack-Postava] Add log4j settings for Copycat. 220e42d [Ewen Cheslack-Postava] Replace Avro serializer with JSON serializer. 1243a7c [Ewen Cheslack-Postava] Merge remote-tracking branch 'origin/trunk' into copycat 5a618c6 [Ewen Cheslack-Postava] Remove offset serializers, instead reusing the existing serializers and removing schema projection support. e849e10 [Ewen Cheslack-Postava] Remove duplicated TopicPartition implementation. dec1379 [Ewen Cheslack-Postava] Switch to using new consumer coordinator instead of manually assigning partitions. Remove dependency of copycat-runtime on core. 4a9b4f3 [Ewen Cheslack-Postava] Add some helpful Copycat-specific build and test targets that cover all Copycat packages. 31cd1ca [Ewen Cheslack-Postava] Add CLI tools for Copycat. e14942c [Ewen Cheslack-Postava] Add Copycat file connector. 0233456 [Ewen Cheslack-Postava] Add copycat-avro and copycat-runtime 11981d2 [Ewen Cheslack-Postava] Add copycat-data and copycat-api Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f6acfb08 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f6acfb08 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f6acfb08 Branch: refs/heads/trunk Commit: f6acfb08917946f15cb8de2fee786019124af212 Parents: c8e62c9 Author: Ewen Cheslack-Postava <m...@ewencp.org> Authored: Fri Aug 14 16:00:51 2015 -0700 Committer: Gwen Shapira <csh...@gmail.com> Committed: Fri Aug 14 16:00:51 2015 -0700 ---------------------------------------------------------------------- bin/copycat-standalone.sh | 23 + bin/kafka-run-class.sh | 8 + build.gradle | 269 +- checkstyle/import-control.xml | 57 + .../org/apache/kafka/common/utils/Utils.java | 15 +- config/copycat-console-sink.properties | 19 + config/copycat-console-source.properties | 19 + config/copycat-file-sink.properties | 20 + config/copycat-file-source.properties | 20 + config/copycat-log4j.properties | 23 + config/copycat-standalone.properties | 28 + .../kafka/copycat/connector/Connector.java | 117 + .../copycat/connector/ConnectorContext.java | 33 + .../kafka/copycat/connector/CopycatRecord.java | 103 + .../apache/kafka/copycat/connector/Task.java | 49 + .../kafka/copycat/errors/CopycatException.java | 40 + .../kafka/copycat/sink/SinkConnector.java | 40 + .../apache/kafka/copycat/sink/SinkRecord.java | 71 + .../org/apache/kafka/copycat/sink/SinkTask.java | 64 + .../kafka/copycat/sink/SinkTaskContext.java | 59 + .../kafka/copycat/source/SourceConnector.java | 29 + .../kafka/copycat/source/SourceRecord.java | 103 + .../apache/kafka/copycat/source/SourceTask.java | 62 + .../kafka/copycat/source/SourceTaskContext.java | 40 + .../apache/kafka/copycat/storage/Converter.java | 45 + .../copycat/storage/OffsetStorageReader.java | 59 + .../kafka/copycat/util/ConnectorUtils.java | 66 + .../connector/ConnectorReconfigurationTest.java | 76 + .../kafka/copycat/util/ConnectorUtilsTest.java | 67 + .../copycat/data/DataRuntimeException.java | 36 + .../kafka/copycat/data/DataTypeException.java | 33 + .../kafka/copycat/data/ObjectProperties.java | 85 + .../org/apache/kafka/copycat/data/Schema.java | 1054 ++++++++ .../kafka/copycat/data/SchemaBuilder.java | 2415 ++++++++++++++++++ .../copycat/data/SchemaParseException.java | 32 + .../copycat/file/FileStreamSinkConnector.java | 62 + .../kafka/copycat/file/FileStreamSinkTask.java | 79 + .../copycat/file/FileStreamSourceConnector.java | 70 + .../copycat/file/FileStreamSourceTask.java | 176 ++ .../file/FileStreamSinkConnectorTest.java | 85 + .../copycat/file/FileStreamSinkTaskTest.java | 67 + .../file/FileStreamSourceConnectorTest.java | 104 + .../copycat/file/FileStreamSourceTaskTest.java | 140 + .../kafka/copycat/json/JsonConverter.java | 265 ++ .../kafka/copycat/json/JsonDeserializer.java | 87 + .../apache/kafka/copycat/json/JsonSchema.java | 114 + .../kafka/copycat/json/JsonSerializer.java | 72 + .../kafka/copycat/json/JsonConverterTest.java | 173 ++ .../kafka/copycat/cli/CopycatStandalone.java | 87 + .../apache/kafka/copycat/cli/WorkerConfig.java | 141 + .../kafka/copycat/runtime/ConnectorConfig.java | 87 + .../apache/kafka/copycat/runtime/Copycat.java | 94 + .../apache/kafka/copycat/runtime/Herder.java | 67 + .../copycat/runtime/SinkTaskContextImpl.java | 24 + .../runtime/SourceTaskOffsetCommitter.java | 103 + .../apache/kafka/copycat/runtime/Worker.java | 236 ++ .../kafka/copycat/runtime/WorkerSinkTask.java | 226 ++ .../copycat/runtime/WorkerSinkTaskThread.java | 112 + .../kafka/copycat/runtime/WorkerSourceTask.java | 310 +++ .../kafka/copycat/runtime/WorkerTask.java | 54 + .../standalone/StandaloneConnectorContext.java | 42 + .../runtime/standalone/StandaloneHerder.java | 257 ++ .../copycat/storage/FileOffsetBackingStore.java | 111 + .../storage/MemoryOffsetBackingStore.java | 113 + .../copycat/storage/OffsetBackingStore.java | 74 + .../storage/OffsetStorageReaderImpl.java | 114 + .../copycat/storage/OffsetStorageWriter.java | 208 ++ .../org/apache/kafka/copycat/util/Callback.java | 31 + .../kafka/copycat/util/ConnectorTaskId.java | 71 + .../kafka/copycat/util/FutureCallback.java | 76 + .../kafka/copycat/util/ShutdownableThread.java | 145 ++ .../copycat/runtime/WorkerSinkTaskTest.java | 367 +++ .../copycat/runtime/WorkerSourceTaskTest.java | 279 ++ .../kafka/copycat/runtime/WorkerTest.java | 179 ++ .../standalone/StandaloneHerderTest.java | 186 ++ .../storage/FileOffsetBackingStoreTest.java | 117 + .../storage/OffsetStorageWriterTest.java | 242 ++ .../org/apache/kafka/copycat/util/MockTime.java | 49 + .../copycat/util/ShutdownableThreadTest.java | 72 + .../TestBackgroundThreadExceptionHandler.java | 37 + .../apache/kafka/copycat/util/ThreadedTest.java | 43 + settings.gradle | 4 +- 82 files changed, 11116 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/bin/copycat-standalone.sh ---------------------------------------------------------------------- diff --git a/bin/copycat-standalone.sh b/bin/copycat-standalone.sh new file mode 100755 index 0000000..b219f8a --- /dev/null +++ b/bin/copycat-standalone.sh @@ -0,0 +1,23 @@ +#!/bin/sh +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +base_dir=$(dirname $0) + +if [ "x$KAFKA_LOG4J_OPTS" = "x" ]; then + export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:$base_dir/../config/copycat-log4j.properties" +fi + +exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.copycat.cli.CopycatStandalone "$@" http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/bin/kafka-run-class.sh ---------------------------------------------------------------------- diff --git a/bin/kafka-run-class.sh b/bin/kafka-run-class.sh index 2f00f68..b689b2e 100755 --- a/bin/kafka-run-class.sh +++ b/bin/kafka-run-class.sh @@ -66,6 +66,14 @@ do CLASSPATH=$CLASSPATH:$file done +for cc_pkg in "data" "api" "runtime" "file" "json" +do + for file in $base_dir/copycat/${cc_pkg}/build/libs/copycat-${cc_pkg}*.jar $base_dir/copycat/${cc_pkg}/build/dependant-libs/*.jar; + do + CLASSPATH=$CLASSPATH:$file + done +done + # classpath addition for release for file in $base_dir/libs/*.jar; do http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/build.gradle ---------------------------------------------------------------------- diff --git a/build.gradle b/build.gradle index 1b67e62..864427b 100644 --- a/build.gradle +++ b/build.gradle @@ -28,6 +28,11 @@ buildscript { } def slf4jlog4j='org.slf4j:slf4j-log4j12:1.7.6' +def slf4japi="org.slf4j:slf4j-api:1.7.6" +def junit='junit:junit:4.6' +def easymock='org.easymock:easymock:3.3.1' +def powermock='org.powermock:powermock-module-junit4:1.6.2' +def powermock_easymock='org.powermock:powermock-api-easymock:1.6.2' allprojects { apply plugin: 'idea' @@ -59,7 +64,7 @@ rat { // And some of the files that we have checked in should also be excluded from this check excludes.addAll([ '**/.git/**', - 'build/**', + '**/build/**', 'CONTRIBUTING.md', 'gradlew', 'gradlew.bat', @@ -204,20 +209,25 @@ for ( sv in ['2_10_5', '2_11_7'] ) { } } -tasks.create(name: "jarAll", dependsOn: ['jar_core_2_10_5', 'jar_core_2_11_7', 'clients:jar', 'examples:jar', 'contrib:hadoop-consumer:jar', 'contrib:hadoop-producer:jar', 'log4j-appender:jar', 'tools:jar']) { +def copycatPkgs = ['copycat:data', 'copycat:api', 'copycat:runtime', 'copycat:json', 'copycat:file'] +def pkgs = ['clients', 'examples', 'contrib:hadoop-consumer', 'contrib:hadoop-producer', 'log4j-appender', 'tools'] + copycatPkgs + +tasks.create(name: "jarCopycat", dependsOn: copycatPkgs.collect { it + ":jar" }) {} +tasks.create(name: "jarAll", dependsOn: ['jar_core_2_10_5', 'jar_core_2_11_7'] + pkgs.collect { it + ":jar" }) { } -tasks.create(name: "srcJarAll", dependsOn: ['srcJar_2_10_5', 'srcJar_2_11_7', 'clients:srcJar', 'examples:srcJar', 'contrib:hadoop-consumer:srcJar', 'contrib:hadoop-producer:srcJar', 'log4j-appender:srcJar', 'tools:srcJar']) { } +tasks.create(name: "srcJarAll", dependsOn: ['srcJar_2_10_5', 'srcJar_2_11_7'] + pkgs.collect { it + ":srcJar" }) { } -tasks.create(name: "docsJarAll", dependsOn: ['docsJar_2_10_5', 'docsJar_2_11_7', 'clients:docsJar', 'examples:docsJar', 'contrib:hadoop-consumer:docsJar', 'contrib:hadoop-producer:docsJar', 'log4j-appender:docsJar', 'tools:docsJar']) { } +tasks.create(name: "docsJarAll", dependsOn: ['docsJar_2_10_5', 'docsJar_2_11_7'] + pkgs.collect { it + ":docsJar" }) { } -tasks.create(name: "testAll", dependsOn: ['test_core_2_10_5', 'test_core_2_11_7', 'clients:test', 'log4j-appender:test', 'tools:test']) { +tasks.create(name: "testCopycat", dependsOn: copycatPkgs.collect { it + ":test" }) {} +tasks.create(name: "testAll", dependsOn: ['test_core_2_10_5', 'test_core_2_11_7'] + pkgs.collect { it + ":test" }) { } tasks.create(name: "releaseTarGzAll", dependsOn: ['releaseTarGz_2_10_5', 'releaseTarGz_2_11_7']) { } -tasks.create(name: "uploadArchivesAll", dependsOn: ['uploadCoreArchives_2_10_5', 'uploadCoreArchives_2_11_7', 'clients:uploadArchives', 'examples:uploadArchives', 'contrib:hadoop-consumer:uploadArchives', 'contrib:hadoop-producer:uploadArchives', 'log4j-appender:uploadArchives', 'tools:uploadArchives']) { +tasks.create(name: "uploadArchivesAll", dependsOn: ['uploadCoreArchives_2_10_5', 'uploadCoreArchives_2_11_7'] + pkgs.collect { it + ":uploadArchives" }) { } project(':core') { @@ -239,8 +249,8 @@ project(':core') { compile 'org.scala-lang.modules:scala-parser-combinators_2.11:1.0.4' } - testCompile 'junit:junit:4.6' - testCompile 'org.easymock:easymock:3.0' + testCompile "$junit" + testCompile "$easymock" testCompile 'org.objenesis:objenesis:1.2' testCompile "org.scalatest:scalatest_$baseScalaVersion:2.2.5" @@ -371,11 +381,11 @@ project(':clients') { archivesBaseName = "kafka-clients" dependencies { - compile "org.slf4j:slf4j-api:1.7.6" + compile "$slf4japi" compile 'org.xerial.snappy:snappy-java:1.1.1.7' compile 'net.jpountz.lz4:lz4:1.2.0' - testCompile 'junit:junit:4.6' + testCompile "$junit" testRuntime "$slf4jlog4j" } @@ -423,7 +433,7 @@ project(':tools') { compile 'com.fasterxml.jackson.core:jackson-databind:2.5.4' compile "$slf4jlog4j" - testCompile 'junit:junit:4.6' + testCompile "$junit" testCompile project(path: ':clients', configuration: 'archives') } @@ -471,7 +481,7 @@ project(':log4j-appender') { compile project(':clients') compile "$slf4jlog4j" - testCompile 'junit:junit:4.6' + testCompile "$junit" testCompile project(path: ':clients', configuration: 'archives') } @@ -496,3 +506,238 @@ project(':log4j-appender') { } test.dependsOn('checkstyleMain', 'checkstyleTest') } + +project(':copycat:data') { + apply plugin: 'checkstyle' + archivesBaseName = "copycat-data" + + dependencies { + compile project(':clients') + compile "$slf4japi" + + testCompile "$junit" + testRuntime "$slf4jlog4j" + } + + task testJar(type: Jar) { + classifier = 'test' + from sourceSets.test.output + } + + test { + testLogging { + events "passed", "skipped", "failed" + exceptionFormat = 'full' + } + } + + javadoc { + include "**/org/apache/kafka/copycat/data/*" + } + + artifacts { + archives testJar + } + + configurations { + archives.extendsFrom (testCompile) + } + + /* FIXME Re-enable this with KAFKA-2367 when the placeholder data API is replaced + checkstyle { + configFile = new File(rootDir, "checkstyle/checkstyle.xml") + } + test.dependsOn('checkstyleMain', 'checkstyleTest') */ +} + +project(':copycat:api') { + apply plugin: 'checkstyle' + archivesBaseName = "copycat-api" + + dependencies { + compile project(':copycat:data') + compile "$slf4japi" + + testCompile "$junit" + testRuntime "$slf4jlog4j" + } + + task testJar(type: Jar) { + classifier = 'test' + from sourceSets.test.output + } + + test { + testLogging { + events "passed", "skipped", "failed" + exceptionFormat = 'full' + } + } + + javadoc { + include "**/org/apache/kafka/copycat/*" + } + + artifacts { + archives testJar + } + + configurations { + archives.extendsFrom (testCompile) + } + + checkstyle { + configFile = new File(rootDir, "checkstyle/checkstyle.xml") + } + test.dependsOn('checkstyleMain', 'checkstyleTest') +} + +project(':copycat:json') { + apply plugin: 'checkstyle' + archivesBaseName = "copycat-json" + + dependencies { + compile project(':copycat:api') + compile "$slf4japi" + compile 'com.fasterxml.jackson.core:jackson-databind:2.5.4' + + testCompile "$junit" + testCompile "$easymock" + testCompile "$powermock" + testCompile "$powermock_easymock" + testRuntime "$slf4jlog4j" + } + + task testJar(type: Jar) { + classifier = 'test' + from sourceSets.test.output + } + + test { + testLogging { + events "passed", "skipped", "failed" + exceptionFormat = 'full' + } + } + + javadoc { + include "**/org/apache/kafka/copycat/*" + } + + artifacts { + archives testJar + } + + configurations { + archives.extendsFrom(testCompile) + } + + checkstyle { + configFile = new File(rootDir, "checkstyle/checkstyle.xml") + } + test.dependsOn('checkstyleMain', 'checkstyleTest') + + tasks.create(name: "copyDependantLibs", type: Copy) { + from (configurations.runtime) { + exclude('kafka-clients*') + exclude('copycat-*') + } + into "$buildDir/dependant-libs" + } + + jar { + dependsOn copyDependantLibs + } +} + +project(':copycat:runtime') { + apply plugin: 'checkstyle' + archivesBaseName = "copycat-runtime" + + dependencies { + compile project(':copycat:api') + compile project(':clients') + compile "$slf4japi" + + testCompile "$junit" + testCompile "$easymock" + testCompile "$powermock" + testCompile "$powermock_easymock" + testRuntime "$slf4jlog4j" + testRuntime project(":copycat:json") + } + + task testJar(type: Jar) { + classifier = 'test' + from sourceSets.test.output + } + + test { + testLogging { + events "passed", "skipped", "failed" + exceptionFormat = 'full' + } + } + + javadoc { + include "**/org/apache/kafka/copycat/*" + } + + artifacts { + archives testJar + } + + configurations { + archives.extendsFrom(testCompile) + } + + checkstyle { + configFile = new File(rootDir, "checkstyle/checkstyle.xml") + } + test.dependsOn('checkstyleMain', 'checkstyleTest') +} + +project(':copycat:file') { + apply plugin: 'checkstyle' + archivesBaseName = "copycat-file" + + dependencies { + compile project(':copycat:api') + compile "$slf4japi" + + testCompile "$junit" + testCompile "$easymock" + testCompile "$powermock" + testCompile "$powermock_easymock" + testRuntime "$slf4jlog4j" + } + + task testJar(type: Jar) { + classifier = 'test' + from sourceSets.test.output + } + + test { + testLogging { + events "passed", "skipped", "failed" + exceptionFormat = 'full' + } + } + + javadoc { + include "**/org/apache/kafka/copycat/*" + } + + artifacts { + archives testJar + } + + configurations { + archives.extendsFrom(testCompile) + } + + checkstyle { + configFile = new File(rootDir, "checkstyle/checkstyle.xml") + } + test.dependsOn('checkstyleMain', 'checkstyleTest') +} http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/checkstyle/import-control.xml ---------------------------------------------------------------------- diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index 18be1bb..e3f4f84 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -108,4 +108,61 @@ <allow pkg="org.apache.kafka" /> </subpackage> + <subpackage name="copycat"> + <allow pkg="org.apache.kafka.common" /> + <allow pkg="org.apache.kafka.copycat.data" /> + <allow pkg="org.apache.kafka.copycat.errors" /> + + <subpackage name="source"> + <allow pkg="org.apache.kafka.copycat.connector" /> + <allow pkg="org.apache.kafka.copycat.storage" /> + </subpackage> + + <subpackage name="sink"> + <allow pkg="org.apache.kafka.copycat.connector" /> + <allow pkg="org.apache.kafka.copycat.storage" /> + </subpackage> + + <subpackage name="runtime"> + <allow pkg="org.apache.kafka.copycat" /> + <allow pkg="org.apache.kafka.clients" /> + <!-- for tests --> + <allow pkg="org.easymock" /> + <allow pkg="org.powermock" /> + </subpackage> + + <subpackage name="cli"> + <allow pkg="org.apache.kafka.copycat.runtime" /> + <allow pkg="org.apache.kafka.copycat.util" /> + <allow pkg="org.apache.kafka.common" /> + </subpackage> + + <subpackage name="storage"> + <allow pkg="org.apache.kafka.copycat" /> + <allow pkg="org.apache.kafka.common.serialization" /> + <!-- for tests --> + <allow pkg="org.easymock" /> + <allow pkg="org.powermock" /> + </subpackage> + + <subpackage name="util"> + <allow pkg="org.apache.kafka.copycat" /> + </subpackage> + + <subpackage name="json"> + <allow pkg="com.fasterxml.jackson" /> + <allow pkg="org.apache.kafka.common.serialization" /> + <allow pkg="org.apache.kafka.common.errors" /> + <allow pkg="org.apache.kafka.copycat.storage" /> + </subpackage> + + <subpackage name="file"> + <allow pkg="org.apache.kafka.copycat" /> + <!-- for tests --> + <allow pkg="org.easymock" /> + <allow pkg="org.powermock" /> + </subpackage> + + </subpackage> + </import-control> http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/clients/src/main/java/org/apache/kafka/common/utils/Utils.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java index af9993c..80a914e 100755 --- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java @@ -271,17 +271,30 @@ public class Utils { /** * Instantiate the class */ - public static Object newInstance(Class<?> c) { + public static <T> T newInstance(Class<T> c) { try { return c.newInstance(); } catch (IllegalAccessException e) { throw new KafkaException("Could not instantiate class " + c.getName(), e); } catch (InstantiationException e) { throw new KafkaException("Could not instantiate class " + c.getName() + " Does it have a public no-argument constructor?", e); + } catch (NullPointerException e) { + throw new KafkaException("Requested class was null", e); } } /** + * Look up the class by name and instantiate it. + * @param klass class name + * @param base super class of the class to be instantiated + * @param <T> + * @return the new instance + */ + public static <T> T newInstance(String klass, Class<T> base) throws ClassNotFoundException { + return Utils.newInstance(Class.forName(klass).asSubclass(base)); + } + + /** * Generates 32 bit murmur2 hash from byte array * @param data byte array to hash * @return 32 bit hash of the given array http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/config/copycat-console-sink.properties ---------------------------------------------------------------------- diff --git a/config/copycat-console-sink.properties b/config/copycat-console-sink.properties new file mode 100644 index 0000000..4cd4c33 --- /dev/null +++ b/config/copycat-console-sink.properties @@ -0,0 +1,19 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +name=local-console-sink +connector.class=org.apache.kafka.copycat.file.FileStreamSinkConnector +tasks.max=1 +topics=test \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/config/copycat-console-source.properties ---------------------------------------------------------------------- diff --git a/config/copycat-console-source.properties b/config/copycat-console-source.properties new file mode 100644 index 0000000..17dbbf9 --- /dev/null +++ b/config/copycat-console-source.properties @@ -0,0 +1,19 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +name=local-console-source +connector.class=org.apache.kafka.copycat.file.FileStreamSourceConnector +tasks.max=1 +topic=test \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/config/copycat-file-sink.properties ---------------------------------------------------------------------- diff --git a/config/copycat-file-sink.properties b/config/copycat-file-sink.properties new file mode 100644 index 0000000..3cc0d62 --- /dev/null +++ b/config/copycat-file-sink.properties @@ -0,0 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +name=local-file-sink +connector.class=org.apache.kafka.copycat.file.FileStreamSinkConnector +tasks.max=1 +file=test.sink.txt +topics=test \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/config/copycat-file-source.properties ---------------------------------------------------------------------- diff --git a/config/copycat-file-source.properties b/config/copycat-file-source.properties new file mode 100644 index 0000000..7512e50 --- /dev/null +++ b/config/copycat-file-source.properties @@ -0,0 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +name=local-file-source +connector.class=org.apache.kafka.copycat.file.FileStreamSourceConnector +tasks.max=1 +file=test.txt +topic=test \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/config/copycat-log4j.properties ---------------------------------------------------------------------- diff --git a/config/copycat-log4j.properties b/config/copycat-log4j.properties new file mode 100644 index 0000000..158daed --- /dev/null +++ b/config/copycat-log4j.properties @@ -0,0 +1,23 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +log4j.rootLogger=INFO, stdout + +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n + +log4j.logger.org.apache.zookeeper=ERROR +log4j.logger.org.I0Itec.zkclient=ERROR \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/config/copycat-standalone.properties ---------------------------------------------------------------------- diff --git a/config/copycat-standalone.properties b/config/copycat-standalone.properties new file mode 100644 index 0000000..cf3b268 --- /dev/null +++ b/config/copycat-standalone.properties @@ -0,0 +1,28 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# These are defaults. This file just demonstrates how to override some settings. +bootstrap.servers=localhost:9092 + +key.converter=org.apache.kafka.copycat.json.JsonConverter +value.converter=org.apache.kafka.copycat.json.JsonConverter +key.serializer=org.apache.kafka.copycat.json.JsonSerializer +value.serializer=org.apache.kafka.copycat.json.JsonSerializer +key.deserializer=org.apache.kafka.copycat.json.JsonDeserializer +value.deserializer=org.apache.kafka.copycat.json.JsonDeserializer + +offset.storage.file.filename=/tmp/copycat.offsets +# Flush much faster than normal, which is useful for testing/debugging +offset.flush.interval.ms=10000 http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/api/src/main/java/org/apache/kafka/copycat/connector/Connector.java ---------------------------------------------------------------------- diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/connector/Connector.java b/copycat/api/src/main/java/org/apache/kafka/copycat/connector/Connector.java new file mode 100644 index 0000000..2ea3c95 --- /dev/null +++ b/copycat/api/src/main/java/org/apache/kafka/copycat/connector/Connector.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.copycat.connector; + +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.util.List; +import java.util.Properties; + +/** + * <p> + * Connectors manage integration of Copycat with another system, either as an input that ingests + * data into Kafka or an output that passes data to an external system. Implementations should + * not use this class directly; they should inherit from SourceConnector or SinkConnector. + * </p> + * <p> + * Connectors have two primary tasks. First, given some configuration, they are responsible for + * creating configurations for a set of {@link Task}s that split up the data processing. For + * example, a database Connector might create Tasks by dividing the set of tables evenly among + * tasks. Second, they are responsible for monitoring inputs for changes that require + * reconfiguration and notifying the Copycat runtime via the ConnectorContext. Continuing the + * previous example, the connector might periodically check for new tables and notify Copycat of + * additions and deletions. Copycat will then request new configurations and update the running + * Tasks. + * </p> + */ +@InterfaceStability.Unstable +public abstract class Connector { + + protected ConnectorContext context; + + /** + * Initialize this connector, using the provided ConnectorContext to notify the runtime of + * input configuration changes. + * @param ctx context object used to interact with the Copycat runtime + */ + public void initialize(ConnectorContext ctx) { + context = ctx; + } + + /** + * <p> + * Initialize this connector, using the provided ConnectorContext to notify the runtime of + * input configuration changes and using the provided set of Task configurations. + * This version is only used to recover from failures. + * </p> + * <p> + * The default implementation ignores the provided Task configurations. During recovery, Copycat will request + * an updated set of configurations and update the running Tasks appropriately. However, Connectors should + * implement special handling of this case if it will avoid unnecessary changes to running Tasks. + * </p> + * + * @param ctx context object used to interact with the Copycat runtime + * @param taskConfigs existing task configurations, which may be used when generating new task configs to avoid + * churn in partition to task assignments + */ + public void initialize(ConnectorContext ctx, List<Properties> taskConfigs) { + context = ctx; + // Ignore taskConfigs. May result in more churn of tasks during recovery if updated configs + // are very different, but reduces the difficulty of implementing a Connector + } + + /** + * Start this Connector. This method will only be called on a clean Connector, i.e. it has + * either just been instantiated and initialized or {@link #stop()} has been invoked. + * + * @param props configuration settings + */ + public abstract void start(Properties props); + + /** + * Reconfigure this Connector. Most implementations will not override this, using the default + * implementation that calls {@link #stop()} followed by {@link #start(Properties)}. + * Implementations only need to override this if they want to handle this process more + * efficiently, e.g. without shutting down network connections to the external system. + * + * @param props new configuration settings + */ + public void reconfigure(Properties props) { + stop(); + start(props); + } + + /** + * Returns the Task implementation for this Connector. + */ + public abstract Class<? extends Task> getTaskClass(); + + /** + * Returns a set of configurations for Tasks based on the current configuration, + * producing at most count configurations. + * + * @param maxTasks maximum number of configurations to generate + * @return configurations for Tasks + */ + public abstract List<Properties> getTaskConfigs(int maxTasks); + + /** + * Stop this connector. + */ + public abstract void stop(); +} http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/api/src/main/java/org/apache/kafka/copycat/connector/ConnectorContext.java ---------------------------------------------------------------------- diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/connector/ConnectorContext.java b/copycat/api/src/main/java/org/apache/kafka/copycat/connector/ConnectorContext.java new file mode 100644 index 0000000..ecba69a --- /dev/null +++ b/copycat/api/src/main/java/org/apache/kafka/copycat/connector/ConnectorContext.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.copycat.connector; + +import org.apache.kafka.common.annotation.InterfaceStability; + +/** + * ConnectorContext allows Connectors to proactively interact with the Copycat runtime. + */ +@InterfaceStability.Unstable +public interface ConnectorContext { + /** + * Requests that the runtime reconfigure the Tasks for this source. This should be used to + * indicate to the runtime that something about the input/output has changed (e.g. partitions + * added/removed) and the running Tasks will need to be modified. + */ + void requestTaskReconfiguration(); +} http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/api/src/main/java/org/apache/kafka/copycat/connector/CopycatRecord.java ---------------------------------------------------------------------- diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/connector/CopycatRecord.java b/copycat/api/src/main/java/org/apache/kafka/copycat/connector/CopycatRecord.java new file mode 100644 index 0000000..576904a --- /dev/null +++ b/copycat/api/src/main/java/org/apache/kafka/copycat/connector/CopycatRecord.java @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.copycat.connector; + +import org.apache.kafka.common.annotation.InterfaceStability; + +/** + * <p> + * Base class for records containing data to be copied to/from Kafka. This corresponds closely to + * Kafka's ProducerRecord and ConsumerRecord classes, and holds the data that may be used by both + * sources and sinks (topic, kafkaPartition, key, value). Although both implementations include a + * notion of offset, it is not included here because they differ in type. + * </p> + */ +@InterfaceStability.Unstable +public abstract class CopycatRecord { + private final String topic; + private final Integer kafkaPartition; + private final Object key; + private final Object value; + + public CopycatRecord(String topic, Integer kafkaPartition, Object value) { + this(topic, kafkaPartition, null, value); + } + + public CopycatRecord(String topic, Integer kafkaPartition, Object key, Object value) { + this.topic = topic; + this.kafkaPartition = kafkaPartition; + this.key = key; + this.value = value; + } + + public String getTopic() { + return topic; + } + + public Integer getKafkaPartition() { + return kafkaPartition; + } + + public Object getKey() { + return key; + } + + public Object getValue() { + return value; + } + + @Override + public String toString() { + return "CopycatRecord{" + + "topic='" + topic + '\'' + + ", kafkaPartition=" + kafkaPartition + + ", key=" + key + + ", value=" + value + + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + + CopycatRecord that = (CopycatRecord) o; + + if (key != null ? !key.equals(that.key) : that.key != null) + return false; + if (kafkaPartition != null ? !kafkaPartition.equals(that.kafkaPartition) : that.kafkaPartition != null) + return false; + if (topic != null ? !topic.equals(that.topic) : that.topic != null) + return false; + if (value != null ? !value.equals(that.value) : that.value != null) + return false; + + return true; + } + + @Override + public int hashCode() { + int result = topic != null ? topic.hashCode() : 0; + result = 31 * result + (kafkaPartition != null ? kafkaPartition.hashCode() : 0); + result = 31 * result + (key != null ? key.hashCode() : 0); + result = 31 * result + (value != null ? value.hashCode() : 0); + return result; + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/api/src/main/java/org/apache/kafka/copycat/connector/Task.java ---------------------------------------------------------------------- diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/connector/Task.java b/copycat/api/src/main/java/org/apache/kafka/copycat/connector/Task.java new file mode 100644 index 0000000..cdaba08 --- /dev/null +++ b/copycat/api/src/main/java/org/apache/kafka/copycat/connector/Task.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.copycat.connector; + +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.util.Properties; + +/** + * <p> + * Tasks contain the code that actually copies data to/from another system. They receive + * a configuration from their parent Connector, assigning them a fraction of a Copycat job's work. + * The Copycat framework then pushes/pulls data from the Task. The Task must also be able to + * respond to reconfiguration requests. + * </p> + * <p> + * Task only contains the minimal shared functionality between + * {@link org.apache.kafka.copycat.source.SourceTask} and + * {@link org.apache.kafka.copycat.sink.SinkTask}. + * </p> + */ +@InterfaceStability.Unstable +public interface Task { + /** + * Start the Task + * @param props initial configuration + */ + void start(Properties props); + + /** + * Stop this task. + */ + void stop(); +} http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/api/src/main/java/org/apache/kafka/copycat/errors/CopycatException.java ---------------------------------------------------------------------- diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/errors/CopycatException.java b/copycat/api/src/main/java/org/apache/kafka/copycat/errors/CopycatException.java new file mode 100644 index 0000000..c8f1bad --- /dev/null +++ b/copycat/api/src/main/java/org/apache/kafka/copycat/errors/CopycatException.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.copycat.errors; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.annotation.InterfaceStability; + +/** + * CopycatException is the top-level exception type generated by Copycat and connectors. + */ +@InterfaceStability.Unstable +public class CopycatException extends KafkaException { + + public CopycatException(String s) { + super(s); + } + + public CopycatException(String s, Throwable throwable) { + super(s, throwable); + } + + public CopycatException(Throwable throwable) { + super(throwable); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkConnector.java ---------------------------------------------------------------------- diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkConnector.java b/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkConnector.java new file mode 100644 index 0000000..fb2e694 --- /dev/null +++ b/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkConnector.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.copycat.sink; + +import org.apache.kafka.common.annotation.InterfaceStability; +import org.apache.kafka.copycat.connector.Connector; + +/** + * SinkConnectors implement the Connector interface to send Kafka data to another system. + */ +@InterfaceStability.Unstable +public abstract class SinkConnector extends Connector { + + /** + * <p> + * Configuration key for the list of input topics for this connector. + * </p> + * <p> + * Usually this setting is only relevant to the Copycat framework, but is provided here for + * the convenience of Connector developers if they also need to know the set of topics. + * </p> + */ + public static final String TOPICS_CONFIG = "topics"; + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkRecord.java ---------------------------------------------------------------------- diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkRecord.java b/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkRecord.java new file mode 100644 index 0000000..e3775b3 --- /dev/null +++ b/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkRecord.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.copycat.sink; + +import org.apache.kafka.common.annotation.InterfaceStability; +import org.apache.kafka.copycat.connector.CopycatRecord; + +/** + * SinkRecord is a CopycatRecord that has been read from Kafka and includes the kafkaOffset of + * the record in the Kafka topic-partition in addition to the standard fields. This information + * should be used by the SinkTask to coordinate kafkaOffset commits. + */ +@InterfaceStability.Unstable +public class SinkRecord extends CopycatRecord { + private final long kafkaOffset; + + public SinkRecord(String topic, int partition, Object key, Object value, long kafkaOffset) { + super(topic, partition, key, value); + this.kafkaOffset = kafkaOffset; + } + + public long getKafkaOffset() { + return kafkaOffset; + } + + @Override + public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + if (!super.equals(o)) + return false; + + SinkRecord that = (SinkRecord) o; + + if (kafkaOffset != that.kafkaOffset) + return false; + + return true; + } + + @Override + public int hashCode() { + int result = super.hashCode(); + result = 31 * result + (int) (kafkaOffset ^ (kafkaOffset >>> 32)); + return result; + } + + @Override + public String toString() { + return "SinkRecord{" + + "kafkaOffset=" + kafkaOffset + + "} " + super.toString(); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTask.java ---------------------------------------------------------------------- diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTask.java b/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTask.java new file mode 100644 index 0000000..49fbbe9 --- /dev/null +++ b/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTask.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ +package org.apache.kafka.copycat.sink; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.annotation.InterfaceStability; +import org.apache.kafka.copycat.connector.Task; + +import java.util.Collection; +import java.util.Map; + +/** + * SinkTask is a Task takes records loaded from Kafka and sends them to another system. In + * addition to the basic {@link #put} interface, SinkTasks must also implement {@link #flush} + * to support offset commits. + */ +@InterfaceStability.Unstable +public abstract class SinkTask implements Task { + + /** + * <p> + * The configuration key that provides the list of topics that are inputs for this + * SinkTask. + * </p> + */ + public static final String TOPICS_CONFIG = "topics"; + + protected SinkTaskContext context; + + public void initialize(SinkTaskContext context) { + this.context = context; + } + + /** + * Put the records in the sink. Usually this should send the records to the sink asynchronously + * and immediately return. + * + * @param records the set of records to send + */ + public abstract void put(Collection<SinkRecord> records); + + /** + * Flush all records that have been {@link #put} for the specified topic-partitions. The + * offsets are provided for convenience, but could also be determined by tracking all offsets + * included in the SinkRecords passed to {@link #put}. + * + * @param offsets mapping of TopicPartition to committed offset + */ + public abstract void flush(Map<TopicPartition, Long> offsets); +} http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTaskContext.java ---------------------------------------------------------------------- diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTaskContext.java b/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTaskContext.java new file mode 100644 index 0000000..7cc6109 --- /dev/null +++ b/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTaskContext.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.copycat.sink; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.util.HashMap; +import java.util.Map; + +/** + * Context passed to SinkTasks, allowing them to access utilities in the copycat runtime. + */ +@InterfaceStability.Unstable +public abstract class SinkTaskContext { + private Map<TopicPartition, Long> offsets; + + public SinkTaskContext() { + offsets = new HashMap<>(); + } + + /** + * Reset the consumer offsets for the given topic partitions. SinkTasks should use this when they are started + * if they manage offsets in the sink data store rather than using Kafka consumer offsets. For example, an HDFS + * connector might record offsets in HDFS to provide exactly once delivery. When the SinkTask is started or + * a rebalance occurs, the task would reload offsets from HDFS and use this method to reset the consumer to those + * offsets. + * + * SinkTasks that do not manage their own offsets do not need to use this method. + * + * @param offsets map of offsets for topic partitions + */ + public void resetOffset(Map<TopicPartition, Long> offsets) { + this.offsets = offsets; + } + + /** + * Get offsets that the SinkTask has submitted to be reset. Used by the Copycat framework. + * @return the map of offsets + */ + public Map<TopicPartition, Long> getOffsets() { + return offsets; + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceConnector.java ---------------------------------------------------------------------- diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceConnector.java b/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceConnector.java new file mode 100644 index 0000000..7258cdf --- /dev/null +++ b/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceConnector.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ +package org.apache.kafka.copycat.source; + +import org.apache.kafka.common.annotation.InterfaceStability; +import org.apache.kafka.copycat.connector.Connector; + +/** + * SourceConnectors implement the connector interface to pull data from another system and send + * it to Kafka. + */ +@InterfaceStability.Unstable +public abstract class SourceConnector extends Connector { + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceRecord.java ---------------------------------------------------------------------- diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceRecord.java b/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceRecord.java new file mode 100644 index 0000000..2085f66 --- /dev/null +++ b/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceRecord.java @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.copycat.source; + +import org.apache.kafka.common.annotation.InterfaceStability; +import org.apache.kafka.copycat.connector.CopycatRecord; + +/** + * <p> + * SourceRecords are generated by SourceTasks and passed to Copycat for storage in + * Kafka. In addition to the standard fields in CopycatRecord which specify where data is stored + * in Kafka, they also include a sourcePartition and sourceOffset. + * </p> + * <p> + * The sourcePartition represents a single input sourcePartition that the record came from (e.g. a filename, table + * name, or topic-partition). The sourceOffset represents a position in that sourcePartition which can be used + * to resume consumption of data. + * </p> + * <p> + * These values can have arbitrary structure and should be represented using + * org.apache.kafka.copycat.data objects (or primitive values). For example, a database connector + * might specify the sourcePartition as a record containing { "db": "database_name", "table": + * "table_name"} and the sourceOffset as a Long containing the timestamp of the row. + * </p> + */ +@InterfaceStability.Unstable +public class SourceRecord extends CopycatRecord { + private final Object sourcePartition; + private final Object sourceOffset; + + public SourceRecord(Object sourcePartition, Object sourceOffset, String topic, Integer partition, Object value) { + this(sourcePartition, sourceOffset, topic, partition, null, value); + } + + public SourceRecord(Object sourcePartition, Object sourceOffset, String topic, Object value) { + this(sourcePartition, sourceOffset, topic, null, null, value); + } + + public SourceRecord(Object sourcePartition, Object sourceOffset, String topic, Integer partition, + Object key, Object value) { + super(topic, partition, key, value); + this.sourcePartition = sourcePartition; + this.sourceOffset = sourceOffset; + } + + public Object getSourcePartition() { + return sourcePartition; + } + + public Object getSourceOffset() { + return sourceOffset; + } + + @Override + public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + if (!super.equals(o)) + return false; + + SourceRecord that = (SourceRecord) o; + + if (sourceOffset != null ? !sourceOffset.equals(that.sourceOffset) : that.sourceOffset != null) + return false; + if (sourcePartition != null ? !sourcePartition.equals(that.sourcePartition) : that.sourcePartition != null) + return false; + + return true; + } + + @Override + public int hashCode() { + int result = super.hashCode(); + result = 31 * result + (sourcePartition != null ? sourcePartition.hashCode() : 0); + result = 31 * result + (sourceOffset != null ? sourceOffset.hashCode() : 0); + return result; + } + + @Override + public String toString() { + return "SourceRecord{" + + "sourcePartition=" + sourcePartition + + ", sourceOffset=" + sourceOffset + + "} " + super.toString(); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceTask.java ---------------------------------------------------------------------- diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceTask.java b/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceTask.java new file mode 100644 index 0000000..1e1da34 --- /dev/null +++ b/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceTask.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.copycat.source; + +import org.apache.kafka.common.annotation.InterfaceStability; +import org.apache.kafka.copycat.connector.Task; + +import java.util.List; + +/** + * SourceTask is a Task that pulls records from another system for storage in Kafka. + */ +@InterfaceStability.Unstable +public abstract class SourceTask implements Task { + + protected SourceTaskContext context; + + /** + * Initialize this SourceTask with the specified context object. + */ + public void initialize(SourceTaskContext context) { + this.context = context; + } + + /** + * Poll this SourceTask for new records. This method should block if no data is currently + * available. + * + * @return a list of source records + */ + public abstract List<SourceRecord> poll() throws InterruptedException; + + /** + * <p> + * Commit the offsets, up to the offsets that have been returned by {@link #poll()}. This + * method should block until the commit is complete. + * </p> + * <p> + * SourceTasks are not required to implement this functionality; Copycat will record offsets + * automatically. This hook is provided for systems that also need to store offsets internally + * in their own system. + * </p> + */ + public void commit() throws InterruptedException { + // This space intentionally left blank. + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceTaskContext.java ---------------------------------------------------------------------- diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceTaskContext.java b/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceTaskContext.java new file mode 100644 index 0000000..d52fd62 --- /dev/null +++ b/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceTaskContext.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ +package org.apache.kafka.copycat.source; + +import org.apache.kafka.common.annotation.InterfaceStability; +import org.apache.kafka.copycat.storage.OffsetStorageReader; + +/** + * SourceTaskContext is provided to SourceTasks to allow them to interact with the underlying + * runtime. + */ +@InterfaceStability.Unstable +public class SourceTaskContext { + private final OffsetStorageReader reader; + + public SourceTaskContext(OffsetStorageReader reader) { + this.reader = reader; + } + + /** + * Get the OffsetStorageReader for this SourceTask. + */ + public OffsetStorageReader getOffsetStorageReader() { + return reader; + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/api/src/main/java/org/apache/kafka/copycat/storage/Converter.java ---------------------------------------------------------------------- diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/storage/Converter.java b/copycat/api/src/main/java/org/apache/kafka/copycat/storage/Converter.java new file mode 100644 index 0000000..c50aee7 --- /dev/null +++ b/copycat/api/src/main/java/org/apache/kafka/copycat/storage/Converter.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.copycat.storage; + +import org.apache.kafka.common.annotation.InterfaceStability; + +/** + * The Converter interface provides support for translating between Copycat's runtime data format + * and the "native" runtime format used by the serialization layer. This is used to translate + * two types of data: records and offsets. The (de)serialization is performed by a separate + * component -- the producer or consumer serializer or deserializer for records or a Copycat + * serializer or deserializer for offsets. + */ +@InterfaceStability.Unstable +public interface Converter<T> { + + /** + * Convert a Copycat data object to a native object for serialization. + * @param value + * @return + */ + T fromCopycatData(Object value); + + /** + * Convert a native object to a Copycat data object. + * @param value + * @return + */ + Object toCopycatData(T value); +} http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/api/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReader.java ---------------------------------------------------------------------- diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReader.java b/copycat/api/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReader.java new file mode 100644 index 0000000..785660d --- /dev/null +++ b/copycat/api/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReader.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.copycat.storage; + +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.util.Collection; +import java.util.Map; + +/** + * OffsetStorageReader provides access to the offset storage used by sources. This can be used by + * connectors to determine offsets to start consuming data from. This is most commonly used during + * initialization of a task, but can also be used during runtime, e.g. when reconfiguring a task. + */ +@InterfaceStability.Unstable +public interface OffsetStorageReader { + /** + * Get the offset for the specified partition. If the data isn't already available locally, this + * gets it from the backing store, which may require some network round trips. + * + * @param partition object uniquely identifying the partition of data + * @return object uniquely identifying the offset in the partition of data + */ + Object getOffset(Object partition); + + /** + * <p> + * Get a set of offsets for the specified partition identifiers. This may be more efficient + * than calling {@link #getOffset(Object)} repeatedly. + * </p> + * <p> + * Note that when errors occur, this method omits the associated data and tries to return as + * many of the requested values as possible. This allows a task that's managing many partitions to + * still proceed with any available data. Therefore, implementations should take care to check + * that the data is actually available in the returned response. The only case when an + * exception will be thrown is if the entire request failed, e.g. because the underlying + * storage was unavailable. + * </p> + * + * @param partitions set of identifiers for partitions of data + * @return a map of partition identifiers to decoded offsets + */ + Map<Object, Object> getOffsets(Collection<Object> partitions); +} http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/api/src/main/java/org/apache/kafka/copycat/util/ConnectorUtils.java ---------------------------------------------------------------------- diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/util/ConnectorUtils.java b/copycat/api/src/main/java/org/apache/kafka/copycat/util/ConnectorUtils.java new file mode 100644 index 0000000..f9dd53a --- /dev/null +++ b/copycat/api/src/main/java/org/apache/kafka/copycat/util/ConnectorUtils.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.copycat.util; + +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.util.ArrayList; +import java.util.List; + +/** + * Utilities that connector implementations might find useful. Contains common building blocks + * for writing connectors. + */ +@InterfaceStability.Unstable +public class ConnectorUtils { + /** + * Given a list of elements and a target number of groups, generates list of groups of + * elements to match the target number of groups, spreading them evenly among the groups. + * This generates groups with contiguous elements, which results in intuitive ordering if + * your elements are also ordered (e.g. alphabetical lists of table names if you sort + * table names alphabetically to generate the raw partitions) or can result in efficient + * partitioning if elements are sorted according to some criteria that affects performance + * (e.g. topic partitions with the same leader). + * + * @param elements list of elements to partition + * @param numGroups the number of output groups to generate. + */ + public static <T> List<List<T>> groupPartitions(List<T> elements, int numGroups) { + if (numGroups <= 0) + throw new IllegalArgumentException("Number of groups must be positive."); + + List<List<T>> result = new ArrayList<>(numGroups); + + // Each group has either n+1 or n raw partitions + int perGroup = elements.size() / numGroups; + int leftover = elements.size() - (numGroups * perGroup); + + int assigned = 0; + for (int group = 0; group < numGroups; group++) { + int numThisGroup = group < leftover ? perGroup + 1 : perGroup; + List<T> groupList = new ArrayList<>(numThisGroup); + for (int i = 0; i < numThisGroup; i++) { + groupList.add(elements.get(assigned)); + assigned++; + } + result.add(groupList); + } + + return result; + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/api/src/test/java/org/apache/kafka/copycat/connector/ConnectorReconfigurationTest.java ---------------------------------------------------------------------- diff --git a/copycat/api/src/test/java/org/apache/kafka/copycat/connector/ConnectorReconfigurationTest.java b/copycat/api/src/test/java/org/apache/kafka/copycat/connector/ConnectorReconfigurationTest.java new file mode 100644 index 0000000..e7ad2f3 --- /dev/null +++ b/copycat/api/src/test/java/org/apache/kafka/copycat/connector/ConnectorReconfigurationTest.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.copycat.connector; + +import org.apache.kafka.copycat.errors.CopycatException; +import org.junit.Test; + +import java.util.List; +import java.util.Properties; + +import static org.junit.Assert.assertEquals; + +public class ConnectorReconfigurationTest { + + @Test + public void testDefaultReconfigure() throws Exception { + TestConnector conn = new TestConnector(false); + conn.reconfigure(new Properties()); + assertEquals(conn.stopOrder, 0); + assertEquals(conn.configureOrder, 1); + } + + @Test(expected = CopycatException.class) + public void testReconfigureStopException() throws Exception { + TestConnector conn = new TestConnector(true); + conn.reconfigure(new Properties()); + } + + private static class TestConnector extends Connector { + private boolean stopException; + private int order = 0; + public int stopOrder = -1; + public int configureOrder = -1; + + public TestConnector(boolean stopException) { + this.stopException = stopException; + } + + @Override + public void start(Properties props) { + configureOrder = order++; + } + + @Override + public Class<? extends Task> getTaskClass() { + return null; + } + + @Override + public List<Properties> getTaskConfigs(int count) { + return null; + } + + @Override + public void stop() { + stopOrder = order++; + if (stopException) + throw new CopycatException("error"); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/api/src/test/java/org/apache/kafka/copycat/util/ConnectorUtilsTest.java ---------------------------------------------------------------------- diff --git a/copycat/api/src/test/java/org/apache/kafka/copycat/util/ConnectorUtilsTest.java b/copycat/api/src/test/java/org/apache/kafka/copycat/util/ConnectorUtilsTest.java new file mode 100644 index 0000000..e46967b --- /dev/null +++ b/copycat/api/src/test/java/org/apache/kafka/copycat/util/ConnectorUtilsTest.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.copycat.util; + +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +public class ConnectorUtilsTest { + + private static final List<Integer> FIVE_ELEMENTS = Arrays.asList(1, 2, 3, 4, 5); + + @Test + public void testGroupPartitions() { + + List<List<Integer>> grouped = ConnectorUtils.groupPartitions(FIVE_ELEMENTS, 1); + assertEquals(Arrays.asList(FIVE_ELEMENTS), grouped); + + grouped = ConnectorUtils.groupPartitions(FIVE_ELEMENTS, 2); + assertEquals(Arrays.asList(Arrays.asList(1, 2, 3), Arrays.asList(4, 5)), grouped); + + grouped = ConnectorUtils.groupPartitions(FIVE_ELEMENTS, 3); + assertEquals(Arrays.asList(Arrays.asList(1, 2), + Arrays.asList(3, 4), + Arrays.asList(5)), grouped); + + grouped = ConnectorUtils.groupPartitions(FIVE_ELEMENTS, 5); + assertEquals(Arrays.asList(Arrays.asList(1), + Arrays.asList(2), + Arrays.asList(3), + Arrays.asList(4), + Arrays.asList(5)), grouped); + + grouped = ConnectorUtils.groupPartitions(FIVE_ELEMENTS, 7); + assertEquals(Arrays.asList(Arrays.asList(1), + Arrays.asList(2), + Arrays.asList(3), + Arrays.asList(4), + Arrays.asList(5), + Collections.EMPTY_LIST, + Collections.EMPTY_LIST), grouped); + } + + @Test(expected = IllegalArgumentException.class) + public void testGroupPartitionsInvalidCount() { + ConnectorUtils.groupPartitions(FIVE_ELEMENTS, 0); + } +}