This is an automated email from the ASF dual-hosted git repository.

iemejia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 6f3bcd9  [BEAM-7012] Support TestStream in streaming Flink Runner
     new c44c30f  Merge pull request #8383: [BEAM-7012] Support TestStream in 
streaming Flink Runner
6f3bcd9 is described below

commit 6f3bcd932d4fe70711cb3653192171dad67867dc
Author: Maximilian Michels <m...@apache.org>
AuthorDate: Tue Apr 23 18:49:26 2019 +0200

    [BEAM-7012] Support TestStream in streaming Flink Runner
    
    TestStream is a way to construct a stream with control over elements and
    time. In total, 40 ValidatesRunner tests make use of this feature (tagged 
via
    UsesTestStream). So far only the DirectRunner supported TestStream which 
meant
    that those tests were not executed for other Runners, e.g. Flink.
    
    Implementing TestStream for Flink was not hard, except for supporting the
    processing time functionality for which a clean solution seems impossible.
    However, only 2 of the 40 UsesTestStream tests make use of processing time.
    An annotation (UsesTestStreamWithProcessingTime) was added to allow Runners 
to
    exclude those tests.
    
    This still adds 38 new ValidatesRunner tests in Flink streaming mode.
---
 runners/flink/flink_runner.gradle                  |  3 +-
 .../flink/FlinkStreamingTransformTranslators.java  | 37 +++++++++
 .../wrappers/streaming/io/TestStreamSource.java    | 80 ++++++++++++++++++++
 .../org/apache/beam/sdk/testing/TestStream.java    | 88 ++++++++++++++++++++++
 .../apache/beam/sdk/testing/UsesTestStream.java    |  2 +-
 ....java => UsesTestStreamWithProcessingTime.java} |  7 +-
 .../apache/beam/sdk/testing/TestStreamTest.java    | 21 +++++-
 .../apache/beam/sdk/transforms/GroupByKeyTest.java |  4 +-
 .../org/apache/beam/sdk/transforms/ParDoTest.java  |  7 +-
 9 files changed, 240 insertions(+), 9 deletions(-)

diff --git a/runners/flink/flink_runner.gradle 
b/runners/flink/flink_runner.gradle
index db6d3e3..4bc9944 100644
--- a/runners/flink/flink_runner.gradle
+++ b/runners/flink/flink_runner.gradle
@@ -151,10 +151,11 @@ def createValidatesRunnerTask(Map m) {
       excludeCategories 'org.apache.beam.sdk.testing.UsesCommittedMetrics'
       if (config.streaming) {
         excludeCategories 'org.apache.beam.sdk.testing.UsesImpulse'
+        excludeCategories 
'org.apache.beam.sdk.testing.UsesTestStreamWithProcessingTime'
       } else {
         excludeCategories 
'org.apache.beam.sdk.testing.UsesUnboundedSplittableParDo'
+        excludeCategories 'org.apache.beam.sdk.testing.UsesTestStream'
       }
-      excludeCategories 'org.apache.beam.sdk.testing.UsesTestStream'
     }
   }
 }
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
index 8d42d18..760efdd 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
@@ -49,14 +49,17 @@ import 
org.apache.beam.runners.flink.translation.wrappers.streaming.SplittableDo
 import 
org.apache.beam.runners.flink.translation.wrappers.streaming.WindowDoFnOperator;
 import 
org.apache.beam.runners.flink.translation.wrappers.streaming.WorkItemKeySelector;
 import 
org.apache.beam.runners.flink.translation.wrappers.streaming.io.DedupingOperator;
+import 
org.apache.beam.runners.flink.translation.wrappers.streaming.io.TestStreamSource;
 import 
org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper;
 import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.testing.TestStream;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.CombineFnBase.GlobalCombineFn;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -70,6 +73,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.AppliedCombineFn;
+import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PBegin;
@@ -147,6 +151,8 @@ class FlinkStreamingTransformTranslators {
     TRANSLATORS.put(PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN, new 
GroupByKeyTranslator());
     TRANSLATORS.put(
         PTransformTranslation.COMBINE_PER_KEY_TRANSFORM_URN, new 
CombinePerKeyTranslator());
+
+    TRANSLATORS.put(PTransformTranslation.TEST_STREAM_TRANSFORM_URN, new 
TestStreamTranslator());
   }
 
   public static FlinkStreamingPipelineTranslator.StreamTransformTranslator<?> 
getTranslator(
@@ -1284,6 +1290,37 @@ class FlinkStreamingTransformTranslators {
     }
   }
 
+  /** A translator to support {@link TestStream} with Flink. */
+  private static class TestStreamTranslator<T>
+      extends 
FlinkStreamingPipelineTranslator.StreamTransformTranslator<TestStream<T>> {
+
+    @Override
+    void translateNode(TestStream<T> testStream, 
FlinkStreamingTranslationContext context) {
+      Coder<T> valueCoder = testStream.getValueCoder();
+
+      // Coder for the Elements in the TestStream
+      TestStream.TestStreamCoder<T> testStreamCoder = 
TestStream.TestStreamCoder.of(valueCoder);
+      final byte[] payload;
+      try {
+        payload = CoderUtils.encodeToByteArray(testStreamCoder, testStream);
+      } catch (CoderException e) {
+        throw new RuntimeException("Could not encode TestStream.", e);
+      }
+
+      WindowedValue.FullWindowedValueCoder<T> elementCoder =
+          WindowedValue.getFullCoder(valueCoder, GlobalWindow.Coder.INSTANCE);
+
+      DataStreamSource<WindowedValue<T>> source =
+          context
+              .getExecutionEnvironment()
+              .addSource(
+                  new TestStreamSource<>(testStreamCoder, payload),
+                  new CoderTypeInformation<>(elementCoder));
+
+      context.setOutputDataStream(context.getOutput(testStream), source);
+    }
+  }
+
   /**
    * Wrapper for {@link UnboundedSourceWrapper}, which simplifies output type, 
namely, removes
    * {@link ValueWithRecordId}.
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/TestStreamSource.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/TestStreamSource.java
new file mode 100644
index 0000000..04c6604
--- /dev/null
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/TestStreamSource.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers.streaming.io;
+
+import java.util.List;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.testing.TestStream;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.joda.time.Instant;
+
+/** Flink source for executing {@link org.apache.beam.sdk.testing.TestStream}. 
*/
+public class TestStreamSource<T> extends RichSourceFunction<WindowedValue<T>> {
+
+  private final TestStream.TestStreamCoder<T> testStreamCoder;
+  private final byte[] payload;
+
+  private volatile boolean isRunning = true;
+
+  public TestStreamSource(TestStream.TestStreamCoder<T> testStreamCoder, 
byte[] payload) {
+    this.testStreamCoder = testStreamCoder;
+    this.payload = payload;
+  }
+
+  @Override
+  public void run(SourceContext<WindowedValue<T>> ctx) throws CoderException {
+    TestStream<T> testStream = CoderUtils.decodeFromByteArray(testStreamCoder, 
payload);
+    List<TestStream.Event<T>> events = testStream.getEvents();
+
+    for (int eventId = 0; isRunning && eventId < events.size(); eventId++) {
+      TestStream.Event<T> event = events.get(eventId);
+
+      synchronized (ctx.getCheckpointLock()) {
+        if (event instanceof TestStream.ElementEvent) {
+          for (TimestampedValue<T> element : ((TestStream.ElementEvent<T>) 
event).getElements()) {
+            Instant timestamp = element.getTimestamp();
+            WindowedValue<T> value =
+                WindowedValue.of(
+                    element.getValue(), timestamp, GlobalWindow.INSTANCE, 
PaneInfo.NO_FIRING);
+            ctx.collectWithTimestamp(value, timestamp.getMillis());
+          }
+        } else if (event instanceof TestStream.WatermarkEvent) {
+          long millis = ((TestStream.WatermarkEvent<T>) 
event).getWatermark().getMillis();
+          ctx.emitWatermark(new Watermark(millis));
+        } else if (event instanceof TestStream.ProcessingTimeEvent) {
+          // There seems to be no clean way to implement this
+          throw new UnsupportedOperationException(
+              "Advancing Processing time is not supported by the Flink 
Runner.");
+        } else {
+          throw new IllegalStateException("Unknown event type " + event);
+        }
+      }
+    }
+  }
+
+  @Override
+  public void cancel() {
+    this.isRunning = false;
+  }
+}
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java
index f137b6f..6ab4c0f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java
@@ -21,12 +21,21 @@ import static 
org.apache.beam.vendor.guava.v20_0.com.google.common.base.Precondi
 import static 
org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkNotNull;
 
 import com.google.auto.value.AutoValue;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineRunner;
 import org.apache.beam.sdk.annotations.Internal;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.DurationCoder;
+import org.apache.beam.sdk.coders.InstantCoder;
+import org.apache.beam.sdk.coders.StructuredCoder;
+import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.schemas.SchemaCoder;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -39,6 +48,7 @@ import org.apache.beam.sdk.values.Row;
 import org.apache.beam.sdk.values.TimestampedValue;
 import org.apache.beam.sdk.values.WindowingStrategy;
 import 
org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 
@@ -300,4 +310,82 @@ public final class TestStream<T> extends 
PTransform<PBegin, PCollection<T>> {
   public int hashCode() {
     return Objects.hash(TestStream.class, getValueCoder(), getEvents());
   }
+
+  /** Coder for {@link TestStream}. */
+  public static class TestStreamCoder<T> extends 
StructuredCoder<TestStream<T>> {
+
+    private final TimestampedValue.TimestampedValueCoder<T> elementCoder;
+
+    public static <T> TestStreamCoder<T> of(Coder<T> valueCoder) {
+      return new TestStreamCoder<>(valueCoder);
+    }
+
+    private TestStreamCoder(Coder<T> valueCoder) {
+      this.elementCoder = 
TimestampedValue.TimestampedValueCoder.of(valueCoder);
+    }
+
+    @Override
+    public void encode(TestStream<T> value, OutputStream outStream) throws 
IOException {
+      List<Event<T>> events = value.getEvents();
+      VarIntCoder.of().encode(events.size(), outStream);
+
+      for (Event event : events) {
+        if (event instanceof ElementEvent) {
+          outStream.write(event.getType().ordinal());
+          Iterable<TimestampedValue<T>> elements = ((ElementEvent) 
event).getElements();
+          VarIntCoder.of().encode(Iterables.size(elements), outStream);
+          for (TimestampedValue<T> element : elements) {
+            elementCoder.encode(element, outStream);
+          }
+        } else if (event instanceof WatermarkEvent) {
+          outStream.write(event.getType().ordinal());
+          Instant watermark = ((WatermarkEvent) event).getWatermark();
+          InstantCoder.of().encode(watermark, outStream);
+        } else if (event instanceof ProcessingTimeEvent) {
+          outStream.write(event.getType().ordinal());
+          Duration processingTimeAdvance = ((ProcessingTimeEvent) 
event).getProcessingTimeAdvance();
+          DurationCoder.of().encode(processingTimeAdvance, outStream);
+        }
+      }
+    }
+
+    @Override
+    public TestStream<T> decode(InputStream inStream) throws IOException {
+      Integer numberOfEvents = VarIntCoder.of().decode(inStream);
+      List<Event<T>> events = new ArrayList<>(numberOfEvents);
+
+      for (int i = 0; i < numberOfEvents; i++) {
+        EventType eventType = EventType.values()[inStream.read()];
+        switch (eventType) {
+          case ELEMENT:
+            int numElements = VarIntCoder.of().decode(inStream);
+            List<TimestampedValue<T>> elements = new ArrayList<>(numElements);
+            for (int j = 0; j < numElements; j++) {
+              elements.add(elementCoder.decode(inStream));
+            }
+            events.add(ElementEvent.add(elements));
+            break;
+          case WATERMARK:
+            Instant watermark = InstantCoder.of().decode(inStream);
+            events.add(WatermarkEvent.advanceTo(watermark));
+            break;
+          case PROCESSING_TIME:
+            Duration duration = 
DurationCoder.of().decode(inStream).toDuration();
+            events.add(ProcessingTimeEvent.advanceBy(duration));
+            break;
+          default:
+            throw new IllegalStateException("Unknown event type + " + 
eventType);
+        }
+      }
+      return TestStream.fromRawEvents(elementCoder.getValueCoder(), events);
+    }
+
+    @Override
+    public List<? extends Coder<?>> getCoderArguments() {
+      return Collections.singletonList(elementCoder);
+    }
+
+    @Override
+    public void verifyDeterministic() throws NonDeterministicException {}
+  }
 }
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesTestStream.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesTestStream.java
index c3f6e26..97e7f08 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesTestStream.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesTestStream.java
@@ -19,6 +19,6 @@ package org.apache.beam.sdk.testing;
 
 /**
  * Category tag for tests that use {@link TestStream}, which is not a part of 
the Beam model but a
- * special feature currently only implemented by the direct runner.
+ * special feature currently only implemented by the direct runner and the 
Flink Runner (streaming).
  */
 public interface UsesTestStream {}
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesTestStream.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesTestStreamWithProcessingTime.java
similarity index 74%
copy from 
sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesTestStream.java
copy to 
sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesTestStreamWithProcessingTime.java
index c3f6e26..164e9fd 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesTestStream.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesTestStreamWithProcessingTime.java
@@ -18,7 +18,8 @@
 package org.apache.beam.sdk.testing;
 
 /**
- * Category tag for tests that use {@link TestStream}, which is not a part of 
the Beam model but a
- * special feature currently only implemented by the direct runner.
+ * Subcategory for {@link UsesTestStream} tests which use the processing time 
feature of {@link
+ * TestStream}. Some Runners do not support setting processing time globally 
in a way that {@link
+ * TestStream} demands it.
  */
-public interface UsesTestStream {}
+public interface UsesTestStreamWithProcessingTime extends UsesTestStream {}
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java
index e1a32fa..e21acaf 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java
@@ -17,11 +17,11 @@
  */
 package org.apache.beam.sdk.testing;
 
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.allOf;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.lessThanOrEqualTo;
-import static org.junit.Assert.assertThat;
 
 import java.io.Serializable;
 import java.util.stream.StreamSupport;
@@ -48,6 +48,7 @@ import 
org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.Never;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
+import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TimestampedValue;
@@ -384,4 +385,22 @@ public class TestStreamTest implements Serializable {
 
     p.run().waitUntilFinish();
   }
+
+  @Test
+  public void testCoder() throws Exception {
+    TestStream<String> testStream =
+        TestStream.create(StringUtf8Coder.of())
+            .addElements("hey")
+            .advanceWatermarkTo(Instant.ofEpochMilli(22521600))
+            .advanceProcessingTime(Duration.millis(42))
+            .addElements("hey", "joe")
+            .advanceWatermarkToInfinity();
+
+    TestStream.TestStreamCoder<String> coder = 
TestStream.TestStreamCoder.of(StringUtf8Coder.of());
+
+    byte[] bytes = CoderUtils.encodeToByteArray(coder, testStream);
+    TestStream<String> recoveredStream = CoderUtils.decodeFromByteArray(coder, 
bytes);
+
+    assertThat(recoveredStream, is(testStream));
+  }
 }
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
index adb5096..a0e8311 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
@@ -53,7 +53,7 @@ import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.testing.TestStream;
-import org.apache.beam.sdk.testing.UsesTestStream;
+import org.apache.beam.sdk.testing.UsesTestStreamWithProcessingTime;
 import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
@@ -150,7 +150,7 @@ public class GroupByKeyTest implements Serializable {
      * a spurious output.
      */
     @Test
-    @Category({ValidatesRunner.class, UsesTestStream.class})
+    @Category({ValidatesRunner.class, UsesTestStreamWithProcessingTime.class})
     public void testCombiningAccumulatingProcessingTime() throws Exception {
       PCollection<Integer> triggeredSums =
           p.apply(
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index d552886..84189fc 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -85,6 +85,7 @@ import org.apache.beam.sdk.testing.UsesSetState;
 import org.apache.beam.sdk.testing.UsesSideInputs;
 import org.apache.beam.sdk.testing.UsesStatefulParDo;
 import org.apache.beam.sdk.testing.UsesTestStream;
+import org.apache.beam.sdk.testing.UsesTestStreamWithProcessingTime;
 import org.apache.beam.sdk.testing.UsesTimersInParDo;
 import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.DoFn.OnTimer;
@@ -3065,7 +3066,11 @@ public class ParDoTest implements Serializable {
      * timers when a "set" method is called on it before it goes off.
      */
     @Test
-    @Category({ValidatesRunner.class, UsesTimersInParDo.class, 
UsesTestStream.class})
+    @Category({
+      ValidatesRunner.class,
+      UsesTimersInParDo.class,
+      UsesTestStreamWithProcessingTime.class
+    })
     public void testProcessingTimeTimerCanBeReset() throws Exception {
       final String timerId = "foo";
 

Reply via email to