Repository: samza
Updated Branches:
  refs/heads/master aa0586558 -> f7b0d3834


SAMZA-1602: Moving class StreamAssert from src/test to src/main

Tested with ./gradlew clean build, which is passing

Author: sanil15 <sanil.jai...@gmail.com>

Reviewers: Xinyu Liu <xinyuliu...@gmail.com>

Closes #439 from Sanil15/SAMZA-1602


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/f7b0d383
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/f7b0d383
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/f7b0d383

Branch: refs/heads/master
Commit: f7b0d38340ae4485f0ad91c2d2a7ecb7ff2ecd7c
Parents: aa05865
Author: sanil15 <sanil.jai...@gmail.com>
Authored: Wed Mar 7 13:22:40 2018 -0800
Committer: xiliu <xi...@linkedin.com>
Committed: Wed Mar 7 13:22:40 2018 -0800

----------------------------------------------------------------------
 build.gradle                                    |   4 +-
 .../samza/test/framework/StreamAssert.java      | 167 +++++++++++++++++++
 .../samza/test/operator/BroadcastAssertApp.java |   2 +-
 ...StreamApplicationIntegrationTestHarness.java |   2 +-
 .../apache/samza/test/timer/TestTimerApp.java   |   2 +-
 .../apache/samza/test/util/StreamAssert.java    | 167 -------------------
 6 files changed, 172 insertions(+), 172 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/f7b0d383/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index a15a456..44a6ccd 100644
--- a/build.gradle
+++ b/build.gradle
@@ -761,16 +761,16 @@ project(":samza-test_$scalaVersion") {
     compile "net.sf.jopt-simple:jopt-simple:$joptSimpleVersion"
     compile "javax.mail:mail:1.4"
     compile "org.apache.kafka:kafka_$scalaVersion:$kafkaVersion"
+    compile "junit:junit:$junitVersion"
+    compile "org.hamcrest:hamcrest-all:$hamcrestVersion"
     testCompile "org.apache.kafka:kafka_$scalaVersion:$kafkaVersion:test"
     testCompile "com.101tec:zkclient:$zkClientVersion"
-    testCompile "junit:junit:$junitVersion"
     testCompile project(":samza-kafka_$scalaVersion")
     testCompile "org.apache.kafka:kafka_$scalaVersion:$kafkaVersion:test"
     testCompile "org.apache.kafka:kafka-clients:$kafkaVersion:test"
     testCompile project(":samza-core_$scalaVersion").sourceSets.test.output
     testCompile "org.scalatest:scalatest_$scalaVersion:$scalaTestVersion"
     testCompile "org.mockito:mockito-core:$mockitoVersion"
-    testCompile "org.hamcrest:hamcrest-all:$hamcrestVersion"
     testRuntime "org.slf4j:slf4j-simple:$slf4jVersion"
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/f7b0d383/samza-test/src/main/java/org/apache/samza/test/framework/StreamAssert.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/main/java/org/apache/samza/test/framework/StreamAssert.java 
b/samza-test/src/main/java/org/apache/samza/test/framework/StreamAssert.java
new file mode 100644
index 0000000..de0d962
--- /dev/null
+++ b/samza-test/src/main/java/org/apache/samza/test/framework/StreamAssert.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.test.framework;
+
+import com.google.common.collect.Iterables;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.functions.SinkFunction;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskContext;
+import org.apache.samza.task.TaskCoordinator;
+import org.hamcrest.Matchers;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+
+import static org.junit.Assert.assertThat;
+
+/**
+ * An assertion on the content of a {@link MessageStream}.
+ *
+ * <pre>Example: {@code
+ * MessageStream<String> stream = streamGraph.getInputStream("input", 
serde).map(some_function)...;
+ * ...
+ * StreamAssert.that(id, stream, 
stringSerde).containsInAnyOrder(Arrays.asList("a", "b", "c"));
+ * }</pre>
+ *
+ */
+public class StreamAssert<M> {
+  private final static Map<String, CountDownLatch> LATCHES = new 
ConcurrentHashMap<>();
+  private final static CountDownLatch PLACE_HOLDER = new CountDownLatch(0);
+
+  private final String id;
+  private final MessageStream<M> messageStream;
+  private final Serde<M> serde;
+  private boolean checkEachTask = false;
+
+  public static <M> StreamAssert<M> that(String id, MessageStream<M> 
messageStream, Serde<M> serde) {
+    return new StreamAssert<>(id, messageStream, serde);
+  }
+
+  private StreamAssert(String id, MessageStream<M> messageStream, Serde<M> 
serde) {
+    this.id = id;
+    this.messageStream = messageStream;
+    this.serde = serde;
+  }
+
+  public StreamAssert forEachTask() {
+    checkEachTask = true;
+    return this;
+  }
+
+  public void containsInAnyOrder(final Collection<M> expected) {
+    LATCHES.putIfAbsent(id, PLACE_HOLDER);
+    final MessageStream<M> streamToCheck = checkEachTask
+        ? messageStream
+        : messageStream
+          .partitionBy(m -> null, m -> m, KVSerde.of(new StringSerde(), 
serde), null)
+          .map(kv -> kv.value);
+
+    streamToCheck.sink(new CheckAgainstExpected<M>(id, expected, 
checkEachTask));
+  }
+
+  public static void waitForComplete() {
+    try {
+      while (!LATCHES.isEmpty()) {
+        final Set<String> ids  = new HashSet<>(LATCHES.keySet());
+        for (String id : ids) {
+          while (LATCHES.get(id) == PLACE_HOLDER) {
+            Thread.sleep(100);
+          }
+
+          final CountDownLatch latch = LATCHES.get(id);
+          if (latch != null) {
+            latch.await();
+            LATCHES.remove(id);
+          }
+        }
+      }
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private static final class CheckAgainstExpected<M> implements 
SinkFunction<M> {
+    private static final long TIMEOUT = 5000L;
+
+    private final String id;
+    private final boolean checkEachTask;
+    private final Collection<M> expected;
+
+
+    private transient Timer timer = new Timer();
+    private transient List<M> actual = Collections.synchronizedList(new 
ArrayList<>());
+    private transient TimerTask timerTask = new TimerTask() {
+      @Override
+      public void run() {
+        check();
+      }
+    };
+
+    CheckAgainstExpected(String id, Collection<M> expected, boolean 
checkEachTask) {
+      this.id = id;
+      this.expected = expected;
+      this.checkEachTask = checkEachTask;
+    }
+
+    @Override
+    public void init(Config config, TaskContext context) {
+      final SystemStreamPartition ssp = 
Iterables.getFirst(context.getSystemStreamPartitions(), null);
+      if (ssp == null ? false : ssp.getPartition().getPartitionId() == 0) {
+        final int count = checkEachTask ? 
context.getSamzaContainerContext().taskNames.size() : 1;
+        LATCHES.put(id, new CountDownLatch(count));
+        timer.schedule(timerTask, TIMEOUT);
+      }
+    }
+
+    @Override
+    public void apply(M message, MessageCollector messageCollector, 
TaskCoordinator taskCoordinator) {
+      actual.add(message);
+
+      if (actual.size() >= expected.size()) {
+        timerTask.cancel();
+        check();
+      }
+    }
+
+    private void check() {
+      final CountDownLatch latch = LATCHES.get(id);
+      try {
+        assertThat(actual, Matchers.containsInAnyOrder((M[]) 
expected.toArray()));
+      } finally {
+        latch.countDown();
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/f7b0d383/samza-test/src/test/java/org/apache/samza/test/operator/BroadcastAssertApp.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/operator/BroadcastAssertApp.java
 
b/samza-test/src/test/java/org/apache/samza/test/operator/BroadcastAssertApp.java
index 9c89aba..426a53b 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/operator/BroadcastAssertApp.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/operator/BroadcastAssertApp.java
@@ -25,7 +25,7 @@ import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.StreamGraph;
 import org.apache.samza.serializers.JsonSerdeV2;
 import org.apache.samza.test.operator.data.PageView;
-import org.apache.samza.test.util.StreamAssert;
+import org.apache.samza.test.framework.StreamAssert;
 
 import java.util.Arrays;
 

http://git-wip-us.apache.org/repos/asf/samza/blob/f7b0d383/samza-test/src/test/java/org/apache/samza/test/operator/StreamApplicationIntegrationTestHarness.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/operator/StreamApplicationIntegrationTestHarness.java
 
b/samza-test/src/test/java/org/apache/samza/test/operator/StreamApplicationIntegrationTestHarness.java
index 04497bd..fae3db4 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/operator/StreamApplicationIntegrationTestHarness.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/operator/StreamApplicationIntegrationTestHarness.java
@@ -32,7 +32,7 @@ import org.apache.samza.config.KafkaConfig;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.runtime.ApplicationRunner;
 import org.apache.samza.test.harness.AbstractIntegrationTestHarness;
-import org.apache.samza.test.util.StreamAssert;
+import org.apache.samza.test.framework.StreamAssert;
 import scala.Option;
 import scala.Option$;
 

http://git-wip-us.apache.org/repos/asf/samza/blob/f7b0d383/samza-test/src/test/java/org/apache/samza/test/timer/TestTimerApp.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/timer/TestTimerApp.java 
b/samza-test/src/test/java/org/apache/samza/test/timer/TestTimerApp.java
index d8913c5..27d1063 100644
--- a/samza-test/src/test/java/org/apache/samza/test/timer/TestTimerApp.java
+++ b/samza-test/src/test/java/org/apache/samza/test/timer/TestTimerApp.java
@@ -28,7 +28,7 @@ import org.apache.samza.operators.functions.FlatMapFunction;
 import org.apache.samza.operators.functions.TimerFunction;
 import org.apache.samza.serializers.JsonSerdeV2;
 import org.apache.samza.test.operator.data.PageView;
-import org.apache.samza.test.util.StreamAssert;
+import org.apache.samza.test.framework.StreamAssert;
 
 import java.util.ArrayList;
 import java.util.Arrays;

http://git-wip-us.apache.org/repos/asf/samza/blob/f7b0d383/samza-test/src/test/java/org/apache/samza/test/util/StreamAssert.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/util/StreamAssert.java 
b/samza-test/src/test/java/org/apache/samza/test/util/StreamAssert.java
deleted file mode 100644
index 8a46db0..0000000
--- a/samza-test/src/test/java/org/apache/samza/test/util/StreamAssert.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.test.util;
-
-import com.google.common.collect.Iterables;
-import org.apache.samza.config.Config;
-import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.functions.SinkFunction;
-import org.apache.samza.serializers.KVSerde;
-import org.apache.samza.serializers.Serde;
-import org.apache.samza.serializers.StringSerde;
-import org.apache.samza.system.SystemStreamPartition;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskContext;
-import org.apache.samza.task.TaskCoordinator;
-import org.hamcrest.Matchers;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
-
-import static org.junit.Assert.assertThat;
-
-/**
- * An assertion on the content of a {@link MessageStream}.
- *
- * <p>Example: </pre>{@code
- * MessageStream<String> stream = streamGraph.getInputStream("input", 
serde).map(some_function)...;
- * ...
- * StreamAssert.that(id, stream, 
stringSerde).containsInAnyOrder(Arrays.asList("a", "b", "c"));
- * }</pre>
- *
- */
-public class StreamAssert<M> {
-  private final static Map<String, CountDownLatch> LATCHES = new 
ConcurrentHashMap<>();
-  private final static CountDownLatch PLACE_HOLDER = new CountDownLatch(0);
-
-  private final String id;
-  private final MessageStream<M> messageStream;
-  private final Serde<M> serde;
-  private boolean checkEachTask = false;
-
-  public static <M> StreamAssert<M> that(String id, MessageStream<M> 
messageStream, Serde<M> serde) {
-    return new StreamAssert<>(id, messageStream, serde);
-  }
-
-  private StreamAssert(String id, MessageStream<M> messageStream, Serde<M> 
serde) {
-    this.id = id;
-    this.messageStream = messageStream;
-    this.serde = serde;
-  }
-
-  public StreamAssert forEachTask() {
-    checkEachTask = true;
-    return this;
-  }
-
-  public void containsInAnyOrder(final Collection<M> expected) {
-    LATCHES.putIfAbsent(id, PLACE_HOLDER);
-    final MessageStream<M> streamToCheck = checkEachTask
-        ? messageStream
-        : messageStream
-          .partitionBy(m -> null, m -> m, KVSerde.of(new StringSerde(), 
serde), null)
-          .map(kv -> kv.value);
-
-    streamToCheck.sink(new CheckAgainstExpected<M>(id, expected, 
checkEachTask));
-  }
-
-  public static void waitForComplete() {
-    try {
-      while (!LATCHES.isEmpty()) {
-        final Set<String> ids  = new HashSet<>(LATCHES.keySet());
-        for (String id : ids) {
-          while (LATCHES.get(id) == PLACE_HOLDER) {
-            Thread.sleep(100);
-          }
-
-          final CountDownLatch latch = LATCHES.get(id);
-          if (latch != null) {
-            latch.await();
-            LATCHES.remove(id);
-          }
-        }
-      }
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  private static final class CheckAgainstExpected<M> implements 
SinkFunction<M> {
-    private static final long TIMEOUT = 5000L;
-
-    private final String id;
-    private final boolean checkEachTask;
-    private final Collection<M> expected;
-
-
-    private transient Timer timer = new Timer();
-    private transient List<M> actual = Collections.synchronizedList(new 
ArrayList<>());
-    private transient TimerTask timerTask = new TimerTask() {
-      @Override
-      public void run() {
-        check();
-      }
-    };
-
-    CheckAgainstExpected(String id, Collection<M> expected, boolean 
checkEachTask) {
-      this.id = id;
-      this.expected = expected;
-      this.checkEachTask = checkEachTask;
-    }
-
-    @Override
-    public void init(Config config, TaskContext context) {
-      final SystemStreamPartition ssp = 
Iterables.getFirst(context.getSystemStreamPartitions(), null);
-      if (ssp == null ? false : ssp.getPartition().getPartitionId() == 0) {
-        final int count = checkEachTask ? 
context.getSamzaContainerContext().taskNames.size() : 1;
-        LATCHES.put(id, new CountDownLatch(count));
-        timer.schedule(timerTask, TIMEOUT);
-      }
-    }
-
-    @Override
-    public void apply(M message, MessageCollector messageCollector, 
TaskCoordinator taskCoordinator) {
-      actual.add(message);
-
-      if (actual.size() >= expected.size()) {
-        timerTask.cancel();
-        check();
-      }
-    }
-
-    private void check() {
-      final CountDownLatch latch = LATCHES.get(id);
-      try {
-        assertThat(actual, Matchers.containsInAnyOrder((M[]) 
expected.toArray()));
-      } finally {
-        latch.countDown();
-      }
-    }
-  }
-}

Reply via email to