Repository: samza Updated Branches: refs/heads/master aa0586558 -> f7b0d3834
SAMZA-1602: Moving class StreamAssert from src/test to src/main Tested with ./gradlew clean build, which is passing Author: sanil15 <sanil.jai...@gmail.com> Reviewers: Xinyu Liu <xinyuliu...@gmail.com> Closes #439 from Sanil15/SAMZA-1602 Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/f7b0d383 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/f7b0d383 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/f7b0d383 Branch: refs/heads/master Commit: f7b0d38340ae4485f0ad91c2d2a7ecb7ff2ecd7c Parents: aa05865 Author: sanil15 <sanil.jai...@gmail.com> Authored: Wed Mar 7 13:22:40 2018 -0800 Committer: xiliu <xi...@linkedin.com> Committed: Wed Mar 7 13:22:40 2018 -0800 ---------------------------------------------------------------------- build.gradle | 4 +- .../samza/test/framework/StreamAssert.java | 167 +++++++++++++++++++ .../samza/test/operator/BroadcastAssertApp.java | 2 +- ...StreamApplicationIntegrationTestHarness.java | 2 +- .../apache/samza/test/timer/TestTimerApp.java | 2 +- .../apache/samza/test/util/StreamAssert.java | 167 ------------------- 6 files changed, 172 insertions(+), 172 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/f7b0d383/build.gradle ---------------------------------------------------------------------- diff --git a/build.gradle b/build.gradle index a15a456..44a6ccd 100644 --- a/build.gradle +++ b/build.gradle @@ -761,16 +761,16 @@ project(":samza-test_$scalaVersion") { compile "net.sf.jopt-simple:jopt-simple:$joptSimpleVersion" compile "javax.mail:mail:1.4" compile "org.apache.kafka:kafka_$scalaVersion:$kafkaVersion" + compile "junit:junit:$junitVersion" + compile "org.hamcrest:hamcrest-all:$hamcrestVersion" testCompile "org.apache.kafka:kafka_$scalaVersion:$kafkaVersion:test" testCompile "com.101tec:zkclient:$zkClientVersion" - testCompile "junit:junit:$junitVersion" testCompile project(":samza-kafka_$scalaVersion") testCompile "org.apache.kafka:kafka_$scalaVersion:$kafkaVersion:test" testCompile "org.apache.kafka:kafka-clients:$kafkaVersion:test" testCompile project(":samza-core_$scalaVersion").sourceSets.test.output testCompile "org.scalatest:scalatest_$scalaVersion:$scalaTestVersion" testCompile "org.mockito:mockito-core:$mockitoVersion" - testCompile "org.hamcrest:hamcrest-all:$hamcrestVersion" testRuntime "org.slf4j:slf4j-simple:$slf4jVersion" } http://git-wip-us.apache.org/repos/asf/samza/blob/f7b0d383/samza-test/src/main/java/org/apache/samza/test/framework/StreamAssert.java ---------------------------------------------------------------------- diff --git a/samza-test/src/main/java/org/apache/samza/test/framework/StreamAssert.java b/samza-test/src/main/java/org/apache/samza/test/framework/StreamAssert.java new file mode 100644 index 0000000..de0d962 --- /dev/null +++ b/samza-test/src/main/java/org/apache/samza/test/framework/StreamAssert.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.test.framework; + +import com.google.common.collect.Iterables; +import org.apache.samza.config.Config; +import org.apache.samza.operators.MessageStream; +import org.apache.samza.operators.functions.SinkFunction; +import org.apache.samza.serializers.KVSerde; +import org.apache.samza.serializers.Serde; +import org.apache.samza.serializers.StringSerde; +import org.apache.samza.system.SystemStreamPartition; +import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.TaskContext; +import org.apache.samza.task.TaskCoordinator; +import org.hamcrest.Matchers; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; + +import static org.junit.Assert.assertThat; + +/** + * An assertion on the content of a {@link MessageStream}. + * + * <pre>Example: {@code + * MessageStream<String> stream = streamGraph.getInputStream("input", serde).map(some_function)...; + * ... + * StreamAssert.that(id, stream, stringSerde).containsInAnyOrder(Arrays.asList("a", "b", "c")); + * }</pre> + * + */ +public class StreamAssert<M> { + private final static Map<String, CountDownLatch> LATCHES = new ConcurrentHashMap<>(); + private final static CountDownLatch PLACE_HOLDER = new CountDownLatch(0); + + private final String id; + private final MessageStream<M> messageStream; + private final Serde<M> serde; + private boolean checkEachTask = false; + + public static <M> StreamAssert<M> that(String id, MessageStream<M> messageStream, Serde<M> serde) { + return new StreamAssert<>(id, messageStream, serde); + } + + private StreamAssert(String id, MessageStream<M> messageStream, Serde<M> serde) { + this.id = id; + this.messageStream = messageStream; + this.serde = serde; + } + + public StreamAssert forEachTask() { + checkEachTask = true; + return this; + } + + public void containsInAnyOrder(final Collection<M> expected) { + LATCHES.putIfAbsent(id, PLACE_HOLDER); + final MessageStream<M> streamToCheck = checkEachTask + ? messageStream + : messageStream + .partitionBy(m -> null, m -> m, KVSerde.of(new StringSerde(), serde), null) + .map(kv -> kv.value); + + streamToCheck.sink(new CheckAgainstExpected<M>(id, expected, checkEachTask)); + } + + public static void waitForComplete() { + try { + while (!LATCHES.isEmpty()) { + final Set<String> ids = new HashSet<>(LATCHES.keySet()); + for (String id : ids) { + while (LATCHES.get(id) == PLACE_HOLDER) { + Thread.sleep(100); + } + + final CountDownLatch latch = LATCHES.get(id); + if (latch != null) { + latch.await(); + LATCHES.remove(id); + } + } + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private static final class CheckAgainstExpected<M> implements SinkFunction<M> { + private static final long TIMEOUT = 5000L; + + private final String id; + private final boolean checkEachTask; + private final Collection<M> expected; + + + private transient Timer timer = new Timer(); + private transient List<M> actual = Collections.synchronizedList(new ArrayList<>()); + private transient TimerTask timerTask = new TimerTask() { + @Override + public void run() { + check(); + } + }; + + CheckAgainstExpected(String id, Collection<M> expected, boolean checkEachTask) { + this.id = id; + this.expected = expected; + this.checkEachTask = checkEachTask; + } + + @Override + public void init(Config config, TaskContext context) { + final SystemStreamPartition ssp = Iterables.getFirst(context.getSystemStreamPartitions(), null); + if (ssp == null ? false : ssp.getPartition().getPartitionId() == 0) { + final int count = checkEachTask ? context.getSamzaContainerContext().taskNames.size() : 1; + LATCHES.put(id, new CountDownLatch(count)); + timer.schedule(timerTask, TIMEOUT); + } + } + + @Override + public void apply(M message, MessageCollector messageCollector, TaskCoordinator taskCoordinator) { + actual.add(message); + + if (actual.size() >= expected.size()) { + timerTask.cancel(); + check(); + } + } + + private void check() { + final CountDownLatch latch = LATCHES.get(id); + try { + assertThat(actual, Matchers.containsInAnyOrder((M[]) expected.toArray())); + } finally { + latch.countDown(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/f7b0d383/samza-test/src/test/java/org/apache/samza/test/operator/BroadcastAssertApp.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/BroadcastAssertApp.java b/samza-test/src/test/java/org/apache/samza/test/operator/BroadcastAssertApp.java index 9c89aba..426a53b 100644 --- a/samza-test/src/test/java/org/apache/samza/test/operator/BroadcastAssertApp.java +++ b/samza-test/src/test/java/org/apache/samza/test/operator/BroadcastAssertApp.java @@ -25,7 +25,7 @@ import org.apache.samza.operators.MessageStream; import org.apache.samza.operators.StreamGraph; import org.apache.samza.serializers.JsonSerdeV2; import org.apache.samza.test.operator.data.PageView; -import org.apache.samza.test.util.StreamAssert; +import org.apache.samza.test.framework.StreamAssert; import java.util.Arrays; http://git-wip-us.apache.org/repos/asf/samza/blob/f7b0d383/samza-test/src/test/java/org/apache/samza/test/operator/StreamApplicationIntegrationTestHarness.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/StreamApplicationIntegrationTestHarness.java b/samza-test/src/test/java/org/apache/samza/test/operator/StreamApplicationIntegrationTestHarness.java index 04497bd..fae3db4 100644 --- a/samza-test/src/test/java/org/apache/samza/test/operator/StreamApplicationIntegrationTestHarness.java +++ b/samza-test/src/test/java/org/apache/samza/test/operator/StreamApplicationIntegrationTestHarness.java @@ -32,7 +32,7 @@ import org.apache.samza.config.KafkaConfig; import org.apache.samza.config.MapConfig; import org.apache.samza.runtime.ApplicationRunner; import org.apache.samza.test.harness.AbstractIntegrationTestHarness; -import org.apache.samza.test.util.StreamAssert; +import org.apache.samza.test.framework.StreamAssert; import scala.Option; import scala.Option$; http://git-wip-us.apache.org/repos/asf/samza/blob/f7b0d383/samza-test/src/test/java/org/apache/samza/test/timer/TestTimerApp.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/timer/TestTimerApp.java b/samza-test/src/test/java/org/apache/samza/test/timer/TestTimerApp.java index d8913c5..27d1063 100644 --- a/samza-test/src/test/java/org/apache/samza/test/timer/TestTimerApp.java +++ b/samza-test/src/test/java/org/apache/samza/test/timer/TestTimerApp.java @@ -28,7 +28,7 @@ import org.apache.samza.operators.functions.FlatMapFunction; import org.apache.samza.operators.functions.TimerFunction; import org.apache.samza.serializers.JsonSerdeV2; import org.apache.samza.test.operator.data.PageView; -import org.apache.samza.test.util.StreamAssert; +import org.apache.samza.test.framework.StreamAssert; import java.util.ArrayList; import java.util.Arrays; http://git-wip-us.apache.org/repos/asf/samza/blob/f7b0d383/samza-test/src/test/java/org/apache/samza/test/util/StreamAssert.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/util/StreamAssert.java b/samza-test/src/test/java/org/apache/samza/test/util/StreamAssert.java deleted file mode 100644 index 8a46db0..0000000 --- a/samza-test/src/test/java/org/apache/samza/test/util/StreamAssert.java +++ /dev/null @@ -1,167 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.test.util; - -import com.google.common.collect.Iterables; -import org.apache.samza.config.Config; -import org.apache.samza.operators.MessageStream; -import org.apache.samza.operators.functions.SinkFunction; -import org.apache.samza.serializers.KVSerde; -import org.apache.samza.serializers.Serde; -import org.apache.samza.serializers.StringSerde; -import org.apache.samza.system.SystemStreamPartition; -import org.apache.samza.task.MessageCollector; -import org.apache.samza.task.TaskContext; -import org.apache.samza.task.TaskCoordinator; -import org.hamcrest.Matchers; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.Timer; -import java.util.TimerTask; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CountDownLatch; - -import static org.junit.Assert.assertThat; - -/** - * An assertion on the content of a {@link MessageStream}. - * - * <p>Example: </pre>{@code - * MessageStream<String> stream = streamGraph.getInputStream("input", serde).map(some_function)...; - * ... - * StreamAssert.that(id, stream, stringSerde).containsInAnyOrder(Arrays.asList("a", "b", "c")); - * }</pre> - * - */ -public class StreamAssert<M> { - private final static Map<String, CountDownLatch> LATCHES = new ConcurrentHashMap<>(); - private final static CountDownLatch PLACE_HOLDER = new CountDownLatch(0); - - private final String id; - private final MessageStream<M> messageStream; - private final Serde<M> serde; - private boolean checkEachTask = false; - - public static <M> StreamAssert<M> that(String id, MessageStream<M> messageStream, Serde<M> serde) { - return new StreamAssert<>(id, messageStream, serde); - } - - private StreamAssert(String id, MessageStream<M> messageStream, Serde<M> serde) { - this.id = id; - this.messageStream = messageStream; - this.serde = serde; - } - - public StreamAssert forEachTask() { - checkEachTask = true; - return this; - } - - public void containsInAnyOrder(final Collection<M> expected) { - LATCHES.putIfAbsent(id, PLACE_HOLDER); - final MessageStream<M> streamToCheck = checkEachTask - ? messageStream - : messageStream - .partitionBy(m -> null, m -> m, KVSerde.of(new StringSerde(), serde), null) - .map(kv -> kv.value); - - streamToCheck.sink(new CheckAgainstExpected<M>(id, expected, checkEachTask)); - } - - public static void waitForComplete() { - try { - while (!LATCHES.isEmpty()) { - final Set<String> ids = new HashSet<>(LATCHES.keySet()); - for (String id : ids) { - while (LATCHES.get(id) == PLACE_HOLDER) { - Thread.sleep(100); - } - - final CountDownLatch latch = LATCHES.get(id); - if (latch != null) { - latch.await(); - LATCHES.remove(id); - } - } - } - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - private static final class CheckAgainstExpected<M> implements SinkFunction<M> { - private static final long TIMEOUT = 5000L; - - private final String id; - private final boolean checkEachTask; - private final Collection<M> expected; - - - private transient Timer timer = new Timer(); - private transient List<M> actual = Collections.synchronizedList(new ArrayList<>()); - private transient TimerTask timerTask = new TimerTask() { - @Override - public void run() { - check(); - } - }; - - CheckAgainstExpected(String id, Collection<M> expected, boolean checkEachTask) { - this.id = id; - this.expected = expected; - this.checkEachTask = checkEachTask; - } - - @Override - public void init(Config config, TaskContext context) { - final SystemStreamPartition ssp = Iterables.getFirst(context.getSystemStreamPartitions(), null); - if (ssp == null ? false : ssp.getPartition().getPartitionId() == 0) { - final int count = checkEachTask ? context.getSamzaContainerContext().taskNames.size() : 1; - LATCHES.put(id, new CountDownLatch(count)); - timer.schedule(timerTask, TIMEOUT); - } - } - - @Override - public void apply(M message, MessageCollector messageCollector, TaskCoordinator taskCoordinator) { - actual.add(message); - - if (actual.size() >= expected.size()) { - timerTask.cancel(); - check(); - } - } - - private void check() { - final CountDownLatch latch = LATCHES.get(id); - try { - assertThat(actual, Matchers.containsInAnyOrder((M[]) expected.toArray())); - } finally { - latch.countDown(); - } - } - } -}