Repository: kafka Updated Branches: refs/heads/trunk a8ccdc615 -> c6b8de4e6
KAFKA-2807: Move ThroughputThrottler back to tools jar to fix upgrade tests. Author: Ewen Cheslack-Postava <m...@ewencp.org> Reviewers: Gwen Shapira Closes #499 from ewencp/kafka-2807-relocate-throughput-throttler Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c6b8de4e Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c6b8de4e Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c6b8de4e Branch: refs/heads/trunk Commit: c6b8de4e6806d8f9f4af57e15f2a7f4170265c42 Parents: a8ccdc6 Author: Ewen Cheslack-Postava <m...@ewencp.org> Authored: Wed Nov 11 15:55:12 2015 -0800 Committer: Gwen Shapira <csh...@gmail.com> Committed: Wed Nov 11 15:55:12 2015 -0800 ---------------------------------------------------------------------- build.gradle | 45 +++--- .../kafka/common/utils/ThroughputThrottler.java | 141 ------------------- .../connect/tools/VerifiableSourceTask.java | 2 +- settings.gradle | 2 +- .../apache/kafka/tools/ProducerPerformance.java | 1 - .../apache/kafka/tools/ThroughputThrottler.java | 141 +++++++++++++++++++ .../apache/kafka/tools/VerifiableProducer.java | 1 - 7 files changed, 166 insertions(+), 167 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/c6b8de4e/build.gradle ---------------------------------------------------------------------- diff --git a/build.gradle b/build.gradle index 70fdbcd..0ee6c41 100644 --- a/build.gradle +++ b/build.gradle @@ -230,7 +230,7 @@ for ( sv in ['2_10_5', '2_11_7'] ) { } } -def connectPkgs = ['connect:api', 'connect:runtime', 'connect:json', 'connect:file', 'connect:tools'] +def connectPkgs = ['connect-api', 'connect-runtime', 'connect-json', 'connect-file', 'connect-tools'] def pkgs = ['clients', 'examples', 'log4j-appender', 'tools', 'streams'] + connectPkgs tasks.create(name: "jarConnect", dependsOn: connectPkgs.collect { it + ":jar" }) {} @@ -321,7 +321,7 @@ project(':core') { standardOutput = new File('docs/kafka_config.html').newOutputStream() } - task siteDocsTar(dependsOn: ['genProducerConfigDocs', 'genConsumerConfigDocs', 'genKafkaConfigDocs', ':connect:runtime:genConnectConfigDocs'], type: Tar) { + task siteDocsTar(dependsOn: ['genProducerConfigDocs', 'genConsumerConfigDocs', 'genKafkaConfigDocs', ':connect-runtime:genConnectConfigDocs'], type: Tar) { classifier = 'site-docs' compression = Compression.GZIP from project.file("../docs") @@ -342,16 +342,16 @@ project(':core') { from(project.siteDocsTar) { into("site-docs/") } from(project(':tools').jar) { into("libs/") } from(project(':tools').configurations.runtime) { into("libs/") } - from(project(':connect:api').jar) { into("libs/") } - from(project(':connect:api').configurations.runtime) { into("libs/") } - from(project(':connect:runtime').jar) { into("libs/") } - from(project(':connect:runtime').configurations.runtime) { into("libs/") } - from(project(':connect:json').jar) { into("libs/") } - from(project(':connect:json').configurations.runtime) { into("libs/") } - from(project(':connect:file').jar) { into("libs/") } - from(project(':connect:file').configurations.runtime) { into("libs/") } - from(project(':connect:tools').jar) { into("libs/") } - from(project(':connect:tools').configurations.runtime) { into("libs/") } + from(project(':connect-api').jar) { into("libs/") } + from(project(':connect-api').configurations.runtime) { into("libs/") } + from(project(':connect-runtime').jar) { into("libs/") } + from(project(':connect-runtime').configurations.runtime) { into("libs/") } + from(project(':connect-json').jar) { into("libs/") } + from(project(':connect-json').configurations.runtime) { into("libs/") } + from(project(':connect-file').jar) { into("libs/") } + from(project(':connect-file').configurations.runtime) { into("libs/") } + from(project(':connect-tools').jar) { into("libs/") } + from(project(':connect-tools').configurations.runtime) { into("libs/") } } jar { @@ -638,7 +638,7 @@ project(':log4j-appender') { test.dependsOn('checkstyleMain', 'checkstyleTest') } -project(':connect:api') { +project(':connect-api') { apply plugin: 'checkstyle' archivesBaseName = "connect-api" @@ -695,12 +695,12 @@ project(':connect:api') { test.dependsOn('checkstyleMain', 'checkstyleTest') } -project(':connect:json') { +project(':connect-json') { apply plugin: 'checkstyle' archivesBaseName = "connect-json" dependencies { - compile project(':connect:api') + compile project(':connect-api') compile "$slf4japi" compile "com.fasterxml.jackson.core:jackson-databind:$jackson_version" @@ -756,12 +756,12 @@ project(':connect:json') { test.dependsOn('checkstyleMain', 'checkstyleTest') } -project(':connect:runtime') { +project(':connect-runtime') { apply plugin: 'checkstyle' archivesBaseName = "connect-runtime" dependencies { - compile project(':connect:api') + compile project(':connect-api') compile project(':clients') compile "$slf4japi" @@ -776,7 +776,7 @@ project(':connect:runtime') { testCompile "$powermock_easymock" testCompile project(':clients').sourceSets.test.output testRuntime "$slf4jlog4j" - testRuntime project(":connect:json") + testRuntime project(":connect-json") } task testJar(type: Jar) { @@ -830,12 +830,12 @@ project(':connect:runtime') { } } -project(':connect:file') { +project(':connect-file') { apply plugin: 'checkstyle' archivesBaseName = "connect-file" dependencies { - compile project(':connect:api') + compile project(':connect-api') compile "$slf4japi" testCompile "$junit" @@ -890,12 +890,13 @@ project(':connect:file') { test.dependsOn('checkstyleMain', 'checkstyleTest') } -project(':connect:tools') { +project(':connect-tools') { apply plugin: 'checkstyle' archivesBaseName = "connect-tools" dependencies { - compile project(':connect:api') + compile project(':connect-api') + compile project(':tools') compile "$slf4japi" compile "com.fasterxml.jackson.core:jackson-databind:$jackson_version" http://git-wip-us.apache.org/repos/asf/kafka/blob/c6b8de4e/clients/src/main/java/org/apache/kafka/common/utils/ThroughputThrottler.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/utils/ThroughputThrottler.java b/clients/src/main/java/org/apache/kafka/common/utils/ThroughputThrottler.java deleted file mode 100644 index 1c63ffb..0000000 --- a/clients/src/main/java/org/apache/kafka/common/utils/ThroughputThrottler.java +++ /dev/null @@ -1,141 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.common.utils; - - -/** - * This class helps producers throttle throughput. - * - * If targetThroughput >= 0, the resulting average throughput will be approximately - * min(targetThroughput, maximumPossibleThroughput). If targetThroughput < 0, - * no throttling will occur. - * - * To use, do this between successive send attempts: - * <pre> - * {@code - * if (throttler.shouldThrottle(...)) { - * throttler.throttle(); - * } - * } - * </pre> - * - * Note that this can be used to throttle message throughput or data throughput. - */ -public class ThroughputThrottler { - - private static final long NS_PER_MS = 1000000L; - private static final long NS_PER_SEC = 1000 * NS_PER_MS; - private static final long MIN_SLEEP_NS = 2 * NS_PER_MS; - - long sleepTimeNs; - long sleepDeficitNs = 0; - long targetThroughput = -1; - long startMs; - private boolean wakeup = false; - - /** - * @param targetThroughput Can be messages/sec or bytes/sec - * @param startMs When the very first message is sent - */ - public ThroughputThrottler(long targetThroughput, long startMs) { - this.startMs = startMs; - this.targetThroughput = targetThroughput; - this.sleepTimeNs = targetThroughput > 0 ? - NS_PER_SEC / targetThroughput : - Long.MAX_VALUE; - } - - /** - * @param amountSoFar bytes produced so far if you want to throttle data throughput, or - * messages produced so far if you want to throttle message throughput. - * @param sendStartMs timestamp of the most recently sent message - * @return - */ - public boolean shouldThrottle(long amountSoFar, long sendStartMs) { - if (this.targetThroughput < 0) { - // No throttling in this case - return false; - } - - float elapsedMs = (sendStartMs - startMs) / 1000.f; - return elapsedMs > 0 && (amountSoFar / elapsedMs) > this.targetThroughput; - } - - /** - * Occasionally blocks for small amounts of time to achieve targetThroughput. - * - * Note that if targetThroughput is 0, this will block extremely aggressively. - */ - public void throttle() { - if (targetThroughput == 0) { - try { - synchronized (this) { - while (!wakeup) { - this.wait(); - } - } - } catch (InterruptedException e) { - // do nothing - } - return; - } - - // throttle throughput by sleeping, on average, - // (1 / this.throughput) seconds between "things sent" - sleepDeficitNs += sleepTimeNs; - - // If enough sleep deficit has accumulated, sleep a little - if (sleepDeficitNs >= MIN_SLEEP_NS) { - long sleepStartNs = System.nanoTime(); - long currentTimeNs = sleepStartNs; - try { - synchronized (this) { - long elapsed = currentTimeNs - sleepStartNs; - long remaining = sleepDeficitNs - elapsed; - while (!wakeup && remaining > 0) { - long sleepMs = remaining / 1000000; - long sleepNs = remaining - sleepMs * 1000000; - this.wait(sleepMs, (int) sleepNs); - elapsed = System.nanoTime() - sleepStartNs; - remaining = sleepDeficitNs - elapsed; - } - wakeup = false; - } - sleepDeficitNs = 0; - } catch (InterruptedException e) { - // If sleep is cut short, reduce deficit by the amount of - // time we actually spent sleeping - long sleepElapsedNs = System.nanoTime() - sleepStartNs; - if (sleepElapsedNs <= sleepDeficitNs) { - sleepDeficitNs -= sleepElapsedNs; - } - } - } - } - - /** - * Wakeup the throttler if its sleeping. - */ - public void wakeup() { - synchronized (this) { - wakeup = true; - this.notifyAll(); - } - } -} - http://git-wip-us.apache.org/repos/asf/kafka/blob/c6b8de4e/connect/tools/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceTask.java ---------------------------------------------------------------------- diff --git a/connect/tools/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceTask.java b/connect/tools/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceTask.java index 6fee2c4..a85a0e9 100644 --- a/connect/tools/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceTask.java +++ b/connect/tools/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceTask.java @@ -19,11 +19,11 @@ package org.apache.kafka.connect.tools; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.kafka.common.utils.ThroughputThrottler; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.source.SourceTask; +import org.apache.kafka.tools.ThroughputThrottler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/kafka/blob/c6b8de4e/settings.gradle ---------------------------------------------------------------------- diff --git a/settings.gradle b/settings.gradle index 2728b5b..d1543c3 100644 --- a/settings.gradle +++ b/settings.gradle @@ -15,4 +15,4 @@ apply from: file('scala.gradle') include 'core', 'examples', 'clients', 'tools', 'streams', 'log4j-appender', - 'connect:api', 'connect:runtime', 'connect:json', 'connect:file', 'connect:tools' + 'connect-api', 'connect-runtime', 'connect-json', 'connect-file', 'connect-tools' http://git-wip-us.apache.org/repos/asf/kafka/blob/c6b8de4e/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java index 2a7f7b1..3a06862 100644 --- a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java +++ b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java @@ -24,7 +24,6 @@ import net.sourceforge.argparse4j.inf.ArgumentParserException; import net.sourceforge.argparse4j.inf.Namespace; import org.apache.kafka.clients.producer.*; -import org.apache.kafka.common.utils.ThroughputThrottler; public class ProducerPerformance { http://git-wip-us.apache.org/repos/asf/kafka/blob/c6b8de4e/tools/src/main/java/org/apache/kafka/tools/ThroughputThrottler.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/kafka/tools/ThroughputThrottler.java b/tools/src/main/java/org/apache/kafka/tools/ThroughputThrottler.java new file mode 100644 index 0000000..a3bcd2f --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/tools/ThroughputThrottler.java @@ -0,0 +1,141 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.tools; + + +/** + * This class helps producers throttle throughput. + * + * If targetThroughput >= 0, the resulting average throughput will be approximately + * min(targetThroughput, maximumPossibleThroughput). If targetThroughput < 0, + * no throttling will occur. + * + * To use, do this between successive send attempts: + * <pre> + * {@code + * if (throttler.shouldThrottle(...)) { + * throttler.throttle(); + * } + * } + * </pre> + * + * Note that this can be used to throttle message throughput or data throughput. + */ +public class ThroughputThrottler { + + private static final long NS_PER_MS = 1000000L; + private static final long NS_PER_SEC = 1000 * NS_PER_MS; + private static final long MIN_SLEEP_NS = 2 * NS_PER_MS; + + long sleepTimeNs; + long sleepDeficitNs = 0; + long targetThroughput = -1; + long startMs; + private boolean wakeup = false; + + /** + * @param targetThroughput Can be messages/sec or bytes/sec + * @param startMs When the very first message is sent + */ + public ThroughputThrottler(long targetThroughput, long startMs) { + this.startMs = startMs; + this.targetThroughput = targetThroughput; + this.sleepTimeNs = targetThroughput > 0 ? + NS_PER_SEC / targetThroughput : + Long.MAX_VALUE; + } + + /** + * @param amountSoFar bytes produced so far if you want to throttle data throughput, or + * messages produced so far if you want to throttle message throughput. + * @param sendStartMs timestamp of the most recently sent message + * @return + */ + public boolean shouldThrottle(long amountSoFar, long sendStartMs) { + if (this.targetThroughput < 0) { + // No throttling in this case + return false; + } + + float elapsedMs = (sendStartMs - startMs) / 1000.f; + return elapsedMs > 0 && (amountSoFar / elapsedMs) > this.targetThroughput; + } + + /** + * Occasionally blocks for small amounts of time to achieve targetThroughput. + * + * Note that if targetThroughput is 0, this will block extremely aggressively. + */ + public void throttle() { + if (targetThroughput == 0) { + try { + synchronized (this) { + while (!wakeup) { + this.wait(); + } + } + } catch (InterruptedException e) { + // do nothing + } + return; + } + + // throttle throughput by sleeping, on average, + // (1 / this.throughput) seconds between "things sent" + sleepDeficitNs += sleepTimeNs; + + // If enough sleep deficit has accumulated, sleep a little + if (sleepDeficitNs >= MIN_SLEEP_NS) { + long sleepStartNs = System.nanoTime(); + long currentTimeNs = sleepStartNs; + try { + synchronized (this) { + long elapsed = currentTimeNs - sleepStartNs; + long remaining = sleepDeficitNs - elapsed; + while (!wakeup && remaining > 0) { + long sleepMs = remaining / 1000000; + long sleepNs = remaining - sleepMs * 1000000; + this.wait(sleepMs, (int) sleepNs); + elapsed = System.nanoTime() - sleepStartNs; + remaining = sleepDeficitNs - elapsed; + } + wakeup = false; + } + sleepDeficitNs = 0; + } catch (InterruptedException e) { + // If sleep is cut short, reduce deficit by the amount of + // time we actually spent sleeping + long sleepElapsedNs = System.nanoTime() - sleepStartNs; + if (sleepElapsedNs <= sleepDeficitNs) { + sleepDeficitNs -= sleepElapsedNs; + } + } + } + } + + /** + * Wakeup the throttler if its sleeping. + */ + public void wakeup() { + synchronized (this) { + wakeup = true; + this.notifyAll(); + } + } +} + http://git-wip-us.apache.org/repos/asf/kafka/blob/c6b8de4e/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java b/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java index e8bd330..0cd90c0 100644 --- a/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java +++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java @@ -41,7 +41,6 @@ import net.sourceforge.argparse4j.ArgumentParsers; import net.sourceforge.argparse4j.inf.ArgumentParser; import net.sourceforge.argparse4j.inf.ArgumentParserException; import net.sourceforge.argparse4j.inf.Namespace; -import org.apache.kafka.common.utils.ThroughputThrottler; /** * Primarily intended for use with system testing, this producer prints metadata