kafka git commit: KAFKA-2645: Document potentially breaking changes in the release note…
Repository: kafka Updated Branches: refs/heads/trunk e6f9b9e47 -> fc4ef4791 KAFKA-2645: Document potentially breaking changes in the release note⦠â¦s for 0.9.0 Author: Grant HenkeReviewers: Gwen Shapira, Guozhang Wang Closes #337 from granthenke/docs Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/fc4ef479 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/fc4ef479 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/fc4ef479 Branch: refs/heads/trunk Commit: fc4ef479109fbae12470e44f8c12fe9e5f41e179 Parents: e6f9b9e Author: Grant Henke Authored: Tue Oct 27 07:43:19 2015 -0700 Committer: Gwen Shapira Committed: Tue Oct 27 07:43:19 2015 -0700 -- docs/configuration.html | 6 +++--- docs/documentation.html | 10 +- docs/upgrade.html | 28 +++- 3 files changed, 31 insertions(+), 13 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/fc4ef479/docs/configuration.html -- diff --git a/docs/configuration.html b/docs/configuration.html index c3cc13e..41cf995 100644 --- a/docs/configuration.html +++ b/docs/configuration.html @@ -5,9 +5,9 @@ The ASF licenses this file to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - + http://www.apache.org/licenses/LICENSE-2.0 - + Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -445,7 +445,7 @@ ZooKeeper also allows you to add a "chroot" path which will make all kafka data inter.broker.protocol.version - 0.8.3 + 0.9.0 Version of the protocol brokers will use to communicate with each other. This will default for the current version of the broker, but may need to be set to older versions during a rolling upgrade process. In that scenario, upgraded brokers will use the older version of the protocol and therefore will be able to communicate with brokers that were not yet upgraded. See upgrade section for more details. http://git-wip-us.apache.org/repos/asf/kafka/blob/fc4ef479/docs/documentation.html -- diff --git a/docs/documentation.html b/docs/documentation.html index 8f9b081..860f276 100644 --- a/docs/documentation.html +++ b/docs/documentation.html @@ -5,9 +5,9 @@ The ASF licenses this file to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - + http://www.apache.org/licenses/LICENSE-2.0 - + Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -17,10 +17,10 @@ -Kafka 0.8.2 Documentation -Prior releases: 0.7.x, 0.8.0, 0.8.1.X. +Kafka 0.9.0 Documentation +Prior releases: 0.7.x, 0.8.0, 0.8.1.X, 0.8.2.X. - + 1. Getting Started http://git-wip-us.apache.org/repos/asf/kafka/blob/fc4ef479/docs/upgrade.html -- diff --git a/docs/upgrade.html b/docs/upgrade.html index 4b7033a..69bcdc1 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -5,9 +5,9 @@ The ASF licenses this file to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - + http://www.apache.org/licenses/LICENSE-2.0 - + Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -17,13 +17,13 @@ 1.5 Upgrading From Previous Versions -Upgrading from 0.8.0, 0.8.1.X or 0.8.2.X to 0.8.3.0 +Upgrading from 0.8.0, 0.8.1.X or 0.8.2.X to 0.9.0.0 -0.8.3.0 has an inter-broker protocol change from previous versions. For a rolling upgrade: +0.9.0.0 has an inter-broker protocol change from previous versions. For a rolling upgrade: Update server.properties file on all brokers and add the following property: inter.broker.protocol.version=0.8.2.X Upgrade the brokers. This can be done a broker at a time by simply bringing it down, updating the code, and restarting it. -Once the entire cluster is upgraded, bump the protocol
kafka git commit: KAFKA-2452: Add new consumer option to mirror maker.
Repository: kafka Updated Branches: refs/heads/trunk 2e4aed707 -> 2fd645ac2 KAFKA-2452: Add new consumer option to mirror maker. Author: Jiangjie QinReviewers: Ben Stopford, Guozhang Wang Closes #266 from becketqin/KAFKA-2452 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2fd645ac Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2fd645ac Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2fd645ac Branch: refs/heads/trunk Commit: 2fd645ac2fec7cf089cb8175ee47823b67a07226 Parents: 2e4aed7 Author: Jiangjie Qin Authored: Tue Oct 27 07:59:52 2015 -0700 Committer: Gwen Shapira Committed: Tue Oct 27 07:59:52 2015 -0700 -- .../scala/kafka/consumer/BaseConsumer.scala | 12 +- .../main/scala/kafka/tools/MirrorMaker.scala| 554 --- 2 files changed, 373 insertions(+), 193 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/2fd645ac/core/src/main/scala/kafka/consumer/BaseConsumer.scala -- diff --git a/core/src/main/scala/kafka/consumer/BaseConsumer.scala b/core/src/main/scala/kafka/consumer/BaseConsumer.scala index 8b93493..52cd5fa 100644 --- a/core/src/main/scala/kafka/consumer/BaseConsumer.scala +++ b/core/src/main/scala/kafka/consumer/BaseConsumer.scala @@ -28,13 +28,15 @@ trait BaseConsumer { def receive(): BaseConsumerRecord def stop() def cleanup() + def commit() } case class BaseConsumerRecord(topic: String, partition: Int, offset: Long, key: Array[Byte], value: Array[Byte]) class NewShinyConsumer(topic: String, consumerProps: Properties, val timeoutMs: Long = Long.MaxValue) extends BaseConsumer { import org.apache.kafka.clients.consumer.KafkaConsumer - import scala.collection.JavaConversions._ + +import scala.collection.JavaConversions._ val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](consumerProps) consumer.subscribe(List(topic)) @@ -58,6 +60,10 @@ class NewShinyConsumer(topic: String, consumerProps: Properties, val timeoutMs: override def cleanup() { this.consumer.close() } + + override def commit() { +this.consumer.commitSync() + } } class OldConsumer(topicFilter: TopicFilter, consumerProps: Properties) extends BaseConsumer { @@ -81,5 +87,9 @@ class OldConsumer(topicFilter: TopicFilter, consumerProps: Properties) extends B override def cleanup() { this.consumerConnector.shutdown() } + + override def commit() { +this.consumerConnector.commitOffsets + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/2fd645ac/core/src/main/scala/kafka/tools/MirrorMaker.scala -- diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index fbe0c83..3cf754b 100755 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -20,22 +20,27 @@ package kafka.tools import java.util import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger} import java.util.concurrent.{CountDownLatch, TimeUnit} +import java.util.regex.Pattern import java.util.{Collections, Properties} import com.yammer.metrics.core.Gauge import joptsimple.OptionParser -import kafka.consumer.{Blacklist, ConsumerConfig, ConsumerThreadId, ConsumerTimeoutException, TopicFilter, Whitelist, ZookeeperConsumerConnector} +import kafka.client.ClientUtils +import kafka.consumer.{BaseConsumerRecord, ConsumerIterator, BaseConsumer, Blacklist, ConsumerConfig, ConsumerThreadId, ConsumerTimeoutException, TopicFilter, Whitelist, ZookeeperConsumerConnector} import kafka.javaapi.consumer.ConsumerRebalanceListener import kafka.message.MessageAndMetadata import kafka.metrics.KafkaMetricsGroup import kafka.serializer.DefaultDecoder import kafka.utils.{CommandLineUtils, CoreUtils, Logging} +import org.apache.kafka.clients.consumer.{ConsumerWakeupException, Consumer, ConsumerRecord, KafkaConsumer} import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord, RecordMetadata} +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.serialization.ByteArrayDeserializer import org.apache.kafka.common.utils.Utils import scala.collection.JavaConversions._ - +import scala.util.control.ControlThrowable /** * The mirror maker has the following architecture: @@ -56,12 +61,11 @@ import scala.collection.JavaConversions._ */ object MirrorMaker extends Logging with KafkaMetricsGroup { - private var connectors: Seq[ZookeeperConsumerConnector] = null private var producer:
kafka git commit: KAFKA-2516: Rename o.a.k.client.tools to o.a.k.tools
Repository: kafka Updated Branches: refs/heads/trunk fc4ef4791 -> 2e4aed707 KAFKA-2516: Rename o.a.k.client.tools to o.a.k.tools Author: Grant HenkeReviewers: Gwen Shapira, Ewen Cheslack-Postava Closes #310 from granthenke/tools-packaging Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2e4aed70 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2e4aed70 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2e4aed70 Branch: refs/heads/trunk Commit: 2e4aed7070f0283e2c1e0e563fdb3324482463a5 Parents: fc4ef47 Author: Grant Henke Authored: Tue Oct 27 07:44:32 2015 -0700 Committer: Gwen Shapira Committed: Tue Oct 27 07:44:32 2015 -0700 -- bin/kafka-verifiable-producer.sh| 2 +- checkstyle/import-control.xml | 14 +- .../kafkatest/services/kafka_log4j_appender.py | 2 +- .../performance/producer_performance.py | 2 +- .../clients/tools/ProducerPerformance.java | 201 .../clients/tools/ThroughputThrottler.java | 118 --- .../clients/tools/VerifiableLog4jAppender.java | 162 -- .../kafka/clients/tools/VerifiableProducer.java | 324 --- .../apache/kafka/tools/ProducerPerformance.java | 201 .../apache/kafka/tools/ThroughputThrottler.java | 117 +++ .../kafka/tools/VerifiableLog4jAppender.java| 162 ++ .../apache/kafka/tools/VerifiableProducer.java | 324 +++ 12 files changed, 814 insertions(+), 815 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/2e4aed70/bin/kafka-verifiable-producer.sh -- diff --git a/bin/kafka-verifiable-producer.sh b/bin/kafka-verifiable-producer.sh index d0aa6c5..98fe557 100755 --- a/bin/kafka-verifiable-producer.sh +++ b/bin/kafka-verifiable-producer.sh @@ -17,4 +17,4 @@ if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then export KAFKA_HEAP_OPTS="-Xmx512M" fi -exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.clients.tools.VerifiableProducer $@ +exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.tools.VerifiableProducer $@ http://git-wip-us.apache.org/repos/asf/kafka/blob/2e4aed70/checkstyle/import-control.xml -- diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index e1ea93c..187bee8 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -105,14 +105,14 @@ + - - - - - - - + + + + + + http://git-wip-us.apache.org/repos/asf/kafka/blob/2e4aed70/tests/kafkatest/services/kafka_log4j_appender.py -- diff --git a/tests/kafkatest/services/kafka_log4j_appender.py b/tests/kafkatest/services/kafka_log4j_appender.py index 11369aa..ff6bb18 100644 --- a/tests/kafkatest/services/kafka_log4j_appender.py +++ b/tests/kafkatest/services/kafka_log4j_appender.py @@ -38,7 +38,7 @@ class KafkaLog4jAppender(BackgroundThreadService): @property def start_cmd(self): -cmd = "/opt/kafka/bin/kafka-run-class.sh org.apache.kafka.clients.tools.VerifiableLog4jAppender" \ +cmd = "/opt/kafka/bin/kafka-run-class.sh org.apache.kafka.tools.VerifiableLog4jAppender" \ " --topic %s --broker-list %s" % (self.topic, self.kafka.bootstrap_servers()) if self.max_messages > 0: cmd += " --max-messages %s" % str(self.max_messages) http://git-wip-us.apache.org/repos/asf/kafka/blob/2e4aed70/tests/kafkatest/services/performance/producer_performance.py -- diff --git a/tests/kafkatest/services/performance/producer_performance.py b/tests/kafkatest/services/performance/producer_performance.py index f842026..25911af 100644 --- a/tests/kafkatest/services/performance/producer_performance.py +++ b/tests/kafkatest/services/performance/producer_performance.py @@ -46,7 +46,7 @@ class ProducerPerformanceService(JmxMixin, PerformanceService): def _worker(self, idx, node): args = self.args.copy() args.update({'bootstrap_servers': self.kafka.bootstrap_servers(), 'jmx_port': self.jmx_port, 'client_id': self.client_id}) -cmd = "JMX_PORT=%(jmx_port)d /opt/kafka/bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance " \ +cmd = "JMX_PORT=%(jmx_port)d /opt/kafka/bin/kafka-run-class.sh org.apache.kafka.tools.ProducerPerformance " \ "%(topic)s %(num_records)d %(record_size)d %(throughput)d bootstrap.servers=%(bootstrap_servers)s client.id=%(client_id)s" %
kafka git commit: KAFKA-2447: Add capability to KafkaLog4jAppender to be able to use SSL
Repository: kafka Updated Branches: refs/heads/trunk 2fd645ac2 -> d21cb66e7 KAFKA-2447: Add capability to KafkaLog4jAppender to be able to use SSL Author: Ashish SinghReviewers: Gwen Shapira, Ismael Juma Closes #175 from SinghAsDev/KAFKA-2447 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d21cb66e Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d21cb66e Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d21cb66e Branch: refs/heads/trunk Commit: d21cb66e7d21ed3d20fc1e13b9a856f764bb4237 Parents: 2fd645a Author: Ashish Singh Authored: Tue Oct 27 08:45:27 2015 -0700 Committer: Gwen Shapira Committed: Tue Oct 27 08:45:27 2015 -0700 -- .../kafka/log4jappender/KafkaLog4jAppender.java | 88 ++-- 1 file changed, 82 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/d21cb66e/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java -- diff --git a/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java b/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java index 2baef06..94120e2 100644 --- a/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java +++ b/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java @@ -17,11 +17,14 @@ package org.apache.kafka.log4jappender; +import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.config.SSLConfigs; import org.apache.log4j.AppenderSkeleton; import org.apache.log4j.helpers.LogLog; import org.apache.log4j.spi.LoggingEvent; @@ -36,16 +39,28 @@ import java.util.concurrent.Future; */ public class KafkaLog4jAppender extends AppenderSkeleton { -private static final String BOOTSTRAP_SERVERS_CONFIG = "bootstrap.servers"; -private static final String COMPRESSION_TYPE_CONFIG = "compression.type"; -private static final String ACKS_CONFIG = "acks"; -private static final String RETRIES_CONFIG = "retries"; -private static final String KEY_SERIALIZER_CLASS_CONFIG = "key.serializer"; -private static final String VALUE_SERIALIZER_CLASS_CONFIG = "value.serializer"; +private static final String BOOTSTRAP_SERVERS_CONFIG = ProducerConfig.BOOTSTRAP_SERVERS_CONFIG; +private static final String COMPRESSION_TYPE_CONFIG = ProducerConfig.COMPRESSION_TYPE_CONFIG; +private static final String ACKS_CONFIG = ProducerConfig.ACKS_CONFIG; +private static final String RETRIES_CONFIG = ProducerConfig.RETRIES_CONFIG; +private static final String KEY_SERIALIZER_CLASS_CONFIG = ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG; +private static final String VALUE_SERIALIZER_CLASS_CONFIG = ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG; +private static final String SECURITY_PROTOCOL = CommonClientConfigs.SECURITY_PROTOCOL_CONFIG; +private static final String SSL_TRUSTSTORE_LOCATION = SSLConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG; +private static final String SSL_TRUSTSTORE_PASSWORD = SSLConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG; +private static final String SSL_KEYSTORE_TYPE = SSLConfigs.SSL_KEYSTORE_TYPE_CONFIG; +private static final String SSL_KEYSTORE_LOCATION = SSLConfigs.SSL_KEYSTORE_LOCATION_CONFIG; +private static final String SSL_KEYSTORE_PASSWORD = SSLConfigs.SSL_KEYSTORE_PASSWORD_CONFIG; private String brokerList = null; private String topic = null; private String compressionType = null; +private String securityProtocol = null; +private String sslTruststoreLocation = null; +private String sslTruststorePassword = null; +private String sslKeystoreType = null; +private String sslKeystoreLocation = null; +private String sslKeystorePassword = null; private int retries = 0; private int requiredNumAcks = Integer.MAX_VALUE; @@ -104,6 +119,54 @@ public class KafkaLog4jAppender extends AppenderSkeleton { this.syncSend = syncSend; } +public String getSslTruststorePassword() { +return sslTruststorePassword; +} + +public String getSslTruststoreLocation() { +return sslTruststoreLocation; +} + +public String getSecurityProtocol() { +return securityProtocol; +} + +public void setSecurityProtocol(String
kafka git commit: HOTFIX: call consumer.poll() even when no task is assigned
Repository: kafka Updated Branches: refs/heads/trunk 38a1b6055 -> af42c3789 HOTFIX: call consumer.poll() even when no task is assigned StreamThread should keep calling consumer.poll() even when no task is assigned. This is necessary to get a task. guozhangwang Author: Yasuhiro MatsudaReviewers: Guozhang Wang Closes #373 from ymatsuda/no_task Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/af42c378 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/af42c378 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/af42c378 Branch: refs/heads/trunk Commit: af42c37899e8fc66590e7f3c4893f8224441f6a8 Parents: 38a1b60 Author: Yasuhiro Matsuda Authored: Tue Oct 27 13:57:19 2015 -0700 Committer: Guozhang Wang Committed: Tue Oct 27 13:57:19 2015 -0700 -- .../processor/internals/StreamThread.java | 25 +--- 1 file changed, 16 insertions(+), 9 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/af42c378/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java -- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index abc5c5d..0bf51d7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -265,22 +265,29 @@ public class StreamThread extends Thread { sensors.pollTimeSensor.record(endPoll - startPoll); } -// try to process one record from each task totalNumBuffered = 0; -requiresPoll = false; -for (StreamTask task : tasks.values()) { -long startProcess = time.milliseconds(); +if (!tasks.isEmpty()) { +// try to process one record from each task +requiresPoll = false; -totalNumBuffered += task.process(); -requiresPoll = requiresPoll || task.requiresPoll(); +for (StreamTask task : tasks.values()) { +long startProcess = time.milliseconds(); -sensors.processTimeSensor.record(time.milliseconds() - startProcess); +totalNumBuffered += task.process(); +requiresPoll = requiresPoll || task.requiresPoll(); + +sensors.processTimeSensor.record(time.milliseconds() - startProcess); +} + +maybePunctuate(); +maybeCommit(); +} else { +// even when no task is assigned, we must poll to get a task. +requiresPoll = true; } -maybePunctuate(); maybeClean(); -maybeCommit(); } } catch (Exception e) { throw new KafkaException(e);
[2/2] kafka git commit: KAFKA-1888: rolling upgrade test
KAFKA-1888: rolling upgrade test ewencp gwenshap This needs some refactoring to avoid the duplicated code between replication test and upgrade test, but in shape for initial feedback. I'm interested in feedback on the added `KafkaConfig` class and `kafka_props` file. This addition makes it: - easier to attach different configs to different nodes (e.g. during broker upgrade process) - easier to reason about the configuration of a particular node Notes: - in the default values in the KafkaConfig class, I removed many properties which were in kafka.properties before. This is because most of those properties were set to what is already the default value. - when running non-trunk VerifiableProducer, I append the trunk tools jar to the classpath, and run it with the non-trunk kafka-run-class.sh script Author: Geoff AndersonReviewers: Dong Lin, Ewen Cheslack-Postava Closes #229 from granders/KAFKA-1888-upgrade-test Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e6b34330 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e6b34330 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e6b34330 Branch: refs/heads/trunk Commit: e6b343302f3208f7f6e0099fe2a7132ef9eaaafb Parents: af42c37 Author: Geoff Anderson Authored: Tue Oct 27 15:23:47 2015 -0700 Committer: Guozhang Wang Committed: Tue Oct 27 15:23:47 2015 -0700 -- tests/kafkatest/__init__.py | 10 + .../sanity_checks/test_console_consumer.py | 43 ++- .../sanity_checks/test_kafka_version.py | 55 .../sanity_checks/test_verifiable_producer.py | 70 + tests/kafkatest/services/console_consumer.py| 109 --- tests/kafkatest/services/copycat.py | 9 +- tests/kafkatest/services/kafka.py | 253 tests/kafkatest/services/kafka/__init__.py | 16 + tests/kafkatest/services/kafka/config.py| 53 .../kafkatest/services/kafka/config_property.py | 177 +++ tests/kafkatest/services/kafka/directory.py | 32 ++ tests/kafkatest/services/kafka/kafka.py | 303 +++ .../services/kafka/templates/kafka.properties | 65 tests/kafkatest/services/kafka/version.py | 61 .../kafkatest/services/kafka_log4j_appender.py | 13 +- tests/kafkatest/services/mirror_maker.py| 5 +- tests/kafkatest/services/monitor/__init__.py| 14 + tests/kafkatest/services/monitor/jmx.py | 90 ++ .../performance/consumer_performance.py | 8 +- .../services/performance/end_to_end_latency.py | 8 +- .../kafkatest/services/performance/jmx_mixin.py | 81 - .../performance/producer_performance.py | 44 +-- .../services/templates/kafka.properties | 74 - tests/kafkatest/services/verifiable_producer.py | 88 +- tests/kafkatest/services/zookeeper.py | 8 +- .../kafkatest/tests/produce_consume_validate.py | 106 +++ tests/kafkatest/tests/quota_test.py | 20 +- tests/kafkatest/tests/replication_test.py | 207 + tests/kafkatest/tests/upgrade_test.py | 81 + tests/kafkatest/utils/__init__.py | 4 +- tests/kafkatest/utils/util.py | 42 +++ tests/setup.py | 10 +- .../apache/kafka/tools/VerifiableProducer.java | 30 +- vagrant/base.sh | 26 +- 34 files changed, 1561 insertions(+), 654 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/e6b34330/tests/kafkatest/__init__.py -- diff --git a/tests/kafkatest/__init__.py b/tests/kafkatest/__init__.py index 28d269b..e346811 100644 --- a/tests/kafkatest/__init__.py +++ b/tests/kafkatest/__init__.py @@ -14,3 +14,13 @@ # limitations under the License. # see kafka.server.KafkaConfig for additional details and defaults +# This determines the version of kafkatest that can be published to PyPi and installed with pip +# +# Note that in development, this version name can't follow Kafka's convention of having a trailing "-SNAPSHOT" +# due to python version naming restrictions, which are enforced by python packaging tools +# (see https://www.python.org/dev/peps/pep-0440/) +# +# Instead, in trunk, the version should have a suffix of the form ".devN" +# +# For example, when Kafka is at version 0.9.0.0-SNAPSHOT, this should be something like "0.9.0.0.dev0" +__version__ = '0.9.0.0.dev0' http://git-wip-us.apache.org/repos/asf/kafka/blob/e6b34330/tests/kafkatest/sanity_checks/test_console_consumer.py -- diff --git
[1/2] kafka git commit: KAFKA-1888: rolling upgrade test
Repository: kafka Updated Branches: refs/heads/trunk af42c3789 -> e6b343302 http://git-wip-us.apache.org/repos/asf/kafka/blob/e6b34330/tests/kafkatest/services/performance/jmx_mixin.py -- diff --git a/tests/kafkatest/services/performance/jmx_mixin.py b/tests/kafkatest/services/performance/jmx_mixin.py deleted file mode 100644 index 7e19839..000 --- a/tests/kafkatest/services/performance/jmx_mixin.py +++ /dev/null @@ -1,81 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -#http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -class JmxMixin(object): - -def __init__(self, num_nodes, jmx_object_names=None, jmx_attributes=[]): -self.jmx_object_names = jmx_object_names -self.jmx_attributes = jmx_attributes -self.jmx_port = 9192 - -self.started = [False] * num_nodes -self.jmx_stats = [{} for x in range(num_nodes)] -self.maximum_jmx_value = {} # map from object_attribute_name to maximum value observed over time -self.average_jmx_value = {} # map from object_attribute_name to average value observed over time - -def clean_node(self, node): -node.account.kill_process("jmx", clean_shutdown=False, allow_fail=True) -node.account.ssh("rm -rf /mnt/jmx_tool.log", allow_fail=False) - -def start_jmx_tool(self, idx, node): -if self.started[idx-1] == True or self.jmx_object_names == None: -return -self.started[idx-1] = True - -cmd = "/opt/kafka/bin/kafka-run-class.sh kafka.tools.JmxTool " \ - "--reporting-interval 1000 --jmx-url service:jmx:rmi:///jndi/rmi://127.0.0.1:%d/jmxrmi" % self.jmx_port -for jmx_object_name in self.jmx_object_names: -cmd += " --object-name %s" % jmx_object_name -for jmx_attribute in self.jmx_attributes: -cmd += " --attributes %s" % jmx_attribute -cmd += " | tee -a /mnt/jmx_tool.log" - -self.logger.debug("Start JmxTool %d command: %s", idx, cmd) -jmx_output = node.account.ssh_capture(cmd, allow_fail=False) -jmx_output.next() - -def read_jmx_output(self, idx, node): -if self.started[idx-1] == False: -return -self.maximum_jmx_value = {} -self.average_jmx_value = {} -object_attribute_names = [] - -cmd = "cat /mnt/jmx_tool.log" -self.logger.debug("Read jmx output %d command: %s", idx, cmd) -for line in node.account.ssh_capture(cmd, allow_fail=False): -if "time" in line: -object_attribute_names = line.strip()[1:-1].split("\",\"")[1:] -continue -stats = [float(field) for field in line.split(',')] -time_sec = int(stats[0]/1000) -self.jmx_stats[idx-1][time_sec] = {name : stats[i+1] for i, name in enumerate(object_attribute_names)} - -# do not calculate average and maximum of jmx stats until we have read output from all nodes -if any(len(time_to_stats)==0 for time_to_stats in self.jmx_stats): -return - -start_time_sec = min([min(time_to_stats.keys()) for time_to_stats in self.jmx_stats]) -end_time_sec = max([max(time_to_stats.keys()) for time_to_stats in self.jmx_stats]) - -for name in object_attribute_names: -aggregates_per_time = [] -for time_sec in xrange(start_time_sec, end_time_sec+1): -# assume that value is 0 if it is not read by jmx tool at the given time. This is appropriate for metrics such as bandwidth -values_per_node = [time_to_stats.get(time_sec, {}).get(name, 0) for time_to_stats in self.jmx_stats] -# assume that value is aggregated across nodes by sum. This is appropriate for metrics such as bandwidth -aggregates_per_time.append(sum(values_per_node)) -self.average_jmx_value[name] = sum(aggregates_per_time)/len(aggregates_per_time) -self.maximum_jmx_value[name] = max(aggregates_per_time) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/e6b34330/tests/kafkatest/services/performance/producer_performance.py -- diff
kafka git commit: KAFKA-2677: ensure consumer sees coordinator disconnects
Repository: kafka Updated Branches: refs/heads/trunk e6b343302 -> 0b05d3b93 KAFKA-2677: ensure consumer sees coordinator disconnects Author: Jason GustafsonReviewers: Ismael Juma, Guozhang Wang Closes #349 from hachikuji/KAFKA-2677 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0b05d3b9 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0b05d3b9 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0b05d3b9 Branch: refs/heads/trunk Commit: 0b05d3b939c5ed37a4253e7c3614d824e76ed664 Parents: e6b3433 Author: Jason Gustafson Authored: Tue Oct 27 16:24:10 2015 -0700 Committer: Guozhang Wang Committed: Tue Oct 27 16:24:10 2015 -0700 -- .../org/apache/kafka/clients/NetworkClient.java | 37 --- .../consumer/internals/AbstractCoordinator.java | 14 +++- .../internals/ConsumerNetworkClient.java| 70 ++-- .../org/apache/kafka/clients/MockClient.java| 15 +++-- 4 files changed, 83 insertions(+), 53 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/0b05d3b9/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java -- diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index 4265004..2c56751 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -12,16 +12,6 @@ */ package org.apache.kafka.clients; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Random; -import java.util.Set; - import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Node; import org.apache.kafka.common.network.NetworkReceive; @@ -40,6 +30,16 @@ import org.apache.kafka.common.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; + /** * A network client for asynchronous request/response network i/o. This is an internal class used to implement the * user-facing producer and consumer clients. @@ -58,6 +58,7 @@ public class NetworkClient implements KafkaClient { /* a list of nodes we've connected to in the past */ private final List nodesEverSeen; private final Map nodesEverSeenById; + /* random offset into nodesEverSeen list */ private final Random randOffset; @@ -234,16 +235,6 @@ public class NetworkClient implements KafkaClient { } /** - * Return the state of the connection to the given node - * - * @param node The node to check - * @return The connection state - */ -public ConnectionState connectionState(String node) { -return connectionStates.connectionState(node); -} - -/** * Queue up the given request for sending. Requests can only be sent out to ready nodes. * * @param request The request @@ -275,7 +266,6 @@ public class NetworkClient implements KafkaClient { @Override public List poll(long timeout, long now) { long metadataTimeout = metadataUpdater.maybeUpdate(now); -long updatedNow = now; try { this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs)); } catch (IOException e) { @@ -283,7 +273,7 @@ public class NetworkClient implements KafkaClient { } // process completed actions -updatedNow = this.time.milliseconds(); +long updatedNow = this.time.milliseconds(); List responses = new ArrayList<>(); handleCompletedSends(responses, updatedNow); handleCompletedReceives(responses, updatedNow); @@ -612,9 +602,8 @@ public class NetworkClient implements KafkaClient { * @param nodes Current alive nodes */ private void updateNodesEverSeen(List nodes) { -Node existing = null; for (Node n : nodes) { -existing = nodesEverSeenById.get(n.id()); +Node existing = nodesEverSeenById.get(n.id()); if (existing == null) { nodesEverSeenById.put(n.id(), n); log.debug("Adding node {} to nodes ever seen", n.id());
kafka git commit: HOTFIX: correct sourceNodes for kstream.through()
Repository: kafka Updated Branches: refs/heads/trunk 0b05d3b93 -> 13c3e049f HOTFIX: correct sourceNodes for kstream.through() guozhangwang Author: Yasuhiro MatsudaReviewers: Guozhang Wang Closes #374 from ymatsuda/fix_through_operator Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/13c3e049 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/13c3e049 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/13c3e049 Branch: refs/heads/trunk Commit: 13c3e049fbf22522c90c2a0b4b4f680b974d9bea Parents: 0b05d3b Author: Yasuhiro Matsuda Authored: Tue Oct 27 16:26:47 2015 -0700 Committer: Guozhang Wang Committed: Tue Oct 27 16:26:47 2015 -0700 -- .../org/apache/kafka/streams/kstream/internals/KStreamImpl.java| 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/13c3e049/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java -- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index 404193a..1a2297c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -178,7 +178,7 @@ public class KStreamImpl implements KStream { topology.addSource(sourceName, keyDeserializer, valDeserializer, topic); -return new KStreamImpl<>(topology, sourceName, Collections.emptySet()); +return new KStreamImpl<>(topology, sourceName, Collections.singleton(sourceName)); } @Override
kafka git commit: KAFKA-2683: ensure wakeup exceptions raised to user
Repository: kafka Updated Branches: refs/heads/trunk 13c3e049f -> 1ac2640f8 KAFKA-2683: ensure wakeup exceptions raised to user Author: Jason GustafsonReviewers: Ewen Cheslack-Postava, Guozhang Wang Closes #366 from hachikuji/KAFKA-2683 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1ac2640f Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1ac2640f Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1ac2640f Branch: refs/heads/trunk Commit: 1ac2640f8095262f423c770060b737f81652e211 Parents: 13c3e04 Author: Jason Gustafson Authored: Tue Oct 27 17:39:19 2015 -0700 Committer: Guozhang Wang Committed: Tue Oct 27 17:39:19 2015 -0700 -- .../consumer/ConsumerWakeupException.java | 20 --- .../kafka/clients/consumer/KafkaConsumer.java | 26 ++-- .../kafka/clients/consumer/MockConsumer.java| 3 ++- .../consumer/internals/AbstractCoordinator.java | 26 +++- .../consumer/internals/ConsumerCoordinator.java | 10 +--- .../internals/ConsumerNetworkClient.java| 10 .../kafka/common/errors/WakeupException.java| 26 .../internals/ConsumerNetworkClientTest.java| 4 +-- .../kafka/copycat/runtime/WorkerSinkTask.java | 3 ++- .../runtime/distributed/DistributedHerder.java | 6 ++--- .../runtime/distributed/WorkerGroupMember.java | 2 +- .../kafka/copycat/util/KafkaBasedLog.java | 10 12 files changed, 81 insertions(+), 65 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/1ac2640f/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java -- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java deleted file mode 100644 index 35f1ec9..000 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java +++ /dev/null @@ -1,20 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE - * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file - * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ -package org.apache.kafka.clients.consumer; - -import org.apache.kafka.common.KafkaException; - -public class ConsumerWakeupException extends KafkaException { -private static final long serialVersionUID = 1L; - -} http://git-wip-us.apache.org/repos/asf/kafka/blob/1ac2640f/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java -- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 06a9239..7aef8a3 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -313,9 +313,9 @@ import java.util.regex.Pattern; * * * The only exception to this rule is {@link #wakeup()}, which can safely be used from an external thread to - * interrupt an active operation. In this case, a {@link ConsumerWakeupException} will be thrown from the thread - * blocking on the operation. This can be used to shutdown the consumer from another thread. The following - * snippet shows the typical pattern: + * interrupt an active operation. In this case, a {@link org.apache.kafka.common.errors.WakeupException} will be + * thrown from the thread blocking on the operation. This can be used to shutdown the consumer from another thread. + * The following snippet shows the typical pattern: * * * public class KafkaConsumerRunner implements Runnable { @@ -329,7 +329,7 @@ import java.util.regex.Pattern; * ConsumerRecords records = consumer.poll(1); * // Handle new records * } - * } catch (ConsumerWakeupException e) { + * } catch (WakeupException e) { *
kafka git commit: MINOR: Expose ReplicaManager gauges
Repository: kafka Updated Branches: refs/heads/trunk 1ac2640f8 -> 7a36d3647 MINOR: Expose ReplicaManager gauges There are several gauges in core that are registered but cannot be accessed programmatically. For example, gauges "LeaderCount", "PartitionCount", "UnderReplicatedParittions" are all registered in ReplicaManager.scala but there is no way to access them programmatically if one has access to the kafka.server object. Other metrics, such as isrExpandRate (also in ReplicaManager.scala) can be accessed. The solution here is trivial, add a var in front of newGauge, as shown below val partitionCount newGauge( "PartitionCount", new Gauge[Int] { def value = allPartitions.size } ) Author: Eno ThereskaReviewers: Ismael Juma, Guozhang Wang Closes #364 from enothereska/gauges Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7a36d364 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7a36d364 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7a36d364 Branch: refs/heads/trunk Commit: 7a36d36478635ae16b64c6410b88c92b45d5f129 Parents: 1ac2640 Author: Eno Thereska Authored: Tue Oct 27 17:55:59 2015 -0700 Committer: Guozhang Wang Committed: Tue Oct 27 17:55:59 2015 -0700 -- core/src/main/scala/kafka/server/ReplicaManager.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/7a36d364/core/src/main/scala/kafka/server/ReplicaManager.scala -- diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 1fc47f4..0413b1a 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -122,7 +122,7 @@ class ReplicaManager(val config: KafkaConfig, val delayedFetchPurgatory = new DelayedOperationPurgatory[DelayedFetch]( purgatoryName = "Fetch", config.brokerId, config.fetchPurgatoryPurgeIntervalRequests) - newGauge( + val leaderCount = newGauge( "LeaderCount", new Gauge[Int] { def value = { @@ -130,13 +130,13 @@ class ReplicaManager(val config: KafkaConfig, } } ) - newGauge( + val partitionCount = newGauge( "PartitionCount", new Gauge[Int] { def value = allPartitions.size } ) - newGauge( + val underReplicatedPartitions = newGauge( "UnderReplicatedPartitions", new Gauge[Int] { def value = underReplicatedPartitionCount()