This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch 0.11.0 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/0.11.0 by this push: new 3792111 MINOR: Update streams upgrade system tests 0.11.0.3 (#5613) 3792111 is described below commit 379211134740268b570fc8edd59ae78df0dffee9 Author: Bill Bejeck <bbej...@gmail.com> AuthorDate: Thu Sep 6 16:06:35 2018 -0400 MINOR: Update streams upgrade system tests 0.11.0.3 (#5613) This is a port of #5605 for the 11.3 branch Reviewers: John Roesler <j...@confluent.io>, Matthias J. Sax <matth...@confluent.io>, Guozhang Wang <wangg...@gmail.com> --- bin/kafka-run-class.sh | 2 +- gradle/dependencies.gradle | 2 +- .../processor/internals/RecordCollectorImpl.java | 8 ++++---- .../processor/internals/RecordCollectorTest.java | 21 ++++++++++----------- tests/docker/Dockerfile | 2 +- .../kafkatest/tests/streams/streams_upgrade_test.py | 11 +++++++---- tests/kafkatest/version.py | 6 ++++-- vagrant/base.sh | 6 +++--- 8 files changed, 31 insertions(+), 27 deletions(-) diff --git a/bin/kafka-run-class.sh b/bin/kafka-run-class.sh index 8e2ba91..9168bb3 100755 --- a/bin/kafka-run-class.sh +++ b/bin/kafka-run-class.sh @@ -111,7 +111,7 @@ else for file in "$base_dir"/streams/upgrade-system-tests-$SHORT_VERSION_NO_DOTS/build/libs/kafka-streams-upgrade-system-tests*.jar; do if should_include_file "$file"; then - CLASSPATH="$CLASSPATH":"$file" + CLASSPATH="$file":"$CLASSPATH" fi done fi diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index 17c1eb8..958b25d 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -58,7 +58,7 @@ versions += [ jersey: "2.24", kafka_0100: "0.10.0.1", kafka_0101: "0.10.1.1", - kafka_0102: "0.10.2.1", + kafka_0102: "0.10.2.2", log4j: "1.2.17", jopt: "5.0.3", junit: "4.12", diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java index 24fb661..38c2e39 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java @@ -16,10 +16,14 @@ */ package org.apache.kafka.streams.processor.internals; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.ProducerFencedException; @@ -31,10 +35,6 @@ import org.apache.kafka.streams.processor.StreamPartitioner; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - public class RecordCollectorImpl implements RecordCollector { private static final int MAX_SEND_ATTEMPTS = 3; private static final long SEND_RETRY_BACKOFF = 100L; diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java index 7876d0f..8ea8d49 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java @@ -16,6 +16,15 @@ */ package org.apache.kafka.streams.processor.internals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.MockProducer; import org.apache.kafka.clients.producer.ProducerRecord; @@ -33,16 +42,6 @@ import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.StreamPartitioner; import org.junit.Test; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.concurrent.Future; -import java.util.concurrent.atomic.AtomicInteger; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; - public class RecordCollectorTest { private final List<PartitionInfo> infos = Arrays.asList( @@ -148,7 +147,7 @@ public class RecordCollectorTest { public void shouldUnwrapAndThrowProducerFencedExceptionFromCallToSend() { final MockProducer<byte[], byte[]> producer = new MockProducer<>(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer); - final RecordCollector collector = new RecordCollectorImpl(producer, "test", logContext); + final RecordCollector collector = new RecordCollectorImpl(producer, "test"); producer.initTransactions(); producer.fenceProducer(); diff --git a/tests/docker/Dockerfile b/tests/docker/Dockerfile index 0cf123c..05493c8 100644 --- a/tests/docker/Dockerfile +++ b/tests/docker/Dockerfile @@ -45,7 +45,7 @@ RUN mkdir -p "/opt/kafka-0.8.2.2" && curl -s "${MIRROR}kafka/0.8.2.2/kafka_2.10- RUN mkdir -p "/opt/kafka-0.9.0.1" && curl -s "${MIRROR}kafka/0.9.0.1/kafka_2.11-0.9.0.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.9.0.1" RUN mkdir -p "/opt/kafka-0.10.0.1" && curl -s "${MIRROR}kafka/0.10.0.1/kafka_2.11-0.10.0.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.10.0.1" RUN mkdir -p "/opt/kafka-0.10.1.1" && curl -s "${MIRROR}kafka/0.10.1.1/kafka_2.11-0.10.1.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.10.1.1" -RUN mkdir -p "/opt/kafka-0.10.2.1" && curl -s "${MIRROR}kafka/0.10.2.1/kafka_2.11-0.10.2.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.10.2.1" +RUN mkdir -p "/opt/kafka-0.10.2.2" && curl -s "${MIRROR}kafka/0.10.2.1/kafka_2.11-0.10.2.2.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.10.2.2" # Set up the ducker user. RUN useradd -ms /bin/bash ducker && mkdir -p /home/ducker/ && rsync -aiq /root/.ssh/ /home/ducker/.ssh && chown -R ducker /home/ducker/ /mnt/ && echo 'ducker ALL=(ALL) NOPASSWD: ALL' >> /etc/sudoers diff --git a/tests/kafkatest/tests/streams/streams_upgrade_test.py b/tests/kafkatest/tests/streams/streams_upgrade_test.py index 7aa2de6..8a87922 100644 --- a/tests/kafkatest/tests/streams/streams_upgrade_test.py +++ b/tests/kafkatest/tests/streams/streams_upgrade_test.py @@ -13,11 +13,14 @@ # See the License for the specific language governing permissions and # limitations under the License. +import random from ducktape.mark import parametrize +from kafkatest.services.streams import StreamsSmokeTestDriverService, \ + StreamsUpgradeTestJobRunnerService from kafkatest.tests.kafka_test import KafkaTest -from kafkatest.services.streams import StreamsSmokeTestDriverService, StreamsUpgradeTestJobRunnerService -from kafkatest.version import LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, DEV_VERSION -import random +from kafkatest.version import LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, \ + DEV_VERSION + class StreamsUpgradeTest(KafkaTest): """ @@ -76,7 +79,7 @@ class StreamsUpgradeTest(KafkaTest): self.driver.stop() #@parametrize(new_version=str(LATEST_0_10_1)) we cannot run this test until Kafka 0.10.1.2 is released - #@parametrize(new_version=str(LATEST_0_10_2)) we cannot run this test until Kafka 0.10.2.2 is released + @parametrize(new_version=str(LATEST_0_10_2)) @parametrize(new_version=str(DEV_VERSION)) def test_metadata_upgrade(self, new_version): """ diff --git a/tests/kafkatest/version.py b/tests/kafkatest/version.py index e525e8c..f8b4b6a 100644 --- a/tests/kafkatest/version.py +++ b/tests/kafkatest/version.py @@ -86,7 +86,8 @@ LATEST_0_10_1 = V_0_10_1_1 # 0.10.2.x versions V_0_10_2_0 = KafkaVersion("0.10.2.0") V_0_10_2_1 = KafkaVersion("0.10.2.1") -LATEST_0_10_2 = V_0_10_2_1 +V_0_10_2_2 = KafkaVersion("0.10.2.2") +LATEST_0_10_2 = V_0_10_2_2 LATEST_0_10 = LATEST_0_10_2 @@ -94,5 +95,6 @@ LATEST_0_10 = LATEST_0_10_2 V_0_11_0_0 = KafkaVersion("0.11.0.0") V_0_11_0_1 = KafkaVersion("0.11.0.1") V_0_11_0_2 = KafkaVersion("0.11.0.2") -LATEST_0_11_0 = V_0_11_0_2 +V_0_11_0_3 = KafkaVersion("0.11.0.3") +LATEST_0_11_0 = V_0_11_0_3 LATEST_0_11 = LATEST_0_11_0 diff --git a/vagrant/base.sh b/vagrant/base.sh index 3d16254..3a9a29b 100755 --- a/vagrant/base.sh +++ b/vagrant/base.sh @@ -63,7 +63,7 @@ get_kafka() { scala_version=$2 kafka_dir=/opt/kafka-$version - url=https://s3-us-west-2.amazonaws.com/kafka-packages-$version/kafka_$scala_version-$version.tgz + url=https://s3-us-west-2.amazonaws.com/kafka-packages/kafka_$scala_version-$version.tgz # the .tgz above does not include the streams test jar hence we need to get it separately url_streams_test=https://s3-us-west-2.amazonaws.com/kafka-packages/kafka-streams-$version-test.jar if [ ! -d /opt/kafka-$version ]; then @@ -88,8 +88,8 @@ get_kafka 0.10.0.1 2.11 chmod a+rw /opt/kafka-0.10.0.1 get_kafka 0.10.1.1 2.11 chmod a+rw /opt/kafka-0.10.1.1 -get_kafka 0.10.2.1 2.11 -chmod a+rw /opt/kafka-0.10.2.1 +get_kafka 0.10.2.2 2.11 +chmod a+rw /opt/kafka-0.10.2.2 # For EC2 nodes, we want to use /mnt, which should have the local disk. On local