Repository: beam
Updated Branches:
  refs/heads/master 8a00f2254 -> 83193698d


http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
new file mode 100644
index 0000000..90f95d6
--- /dev/null
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
@@ -0,0 +1,464 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.streaming;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import 
org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.OperatorStateStore;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.StreamSource;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
+import org.apache.flink.util.InstantiationUtil;
+import org.junit.Test;
+import org.junit.experimental.runners.Enclosed;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.mockito.Matchers;
+
+/**
+ * Tests for {@link UnboundedSourceWrapper}.
+ */
+@RunWith(Enclosed.class)
+public class UnboundedSourceWrapperTest {
+
+  /**
+   * Parameterized tests.
+   */
+  @RunWith(Parameterized.class)
+  public static class UnboundedSourceWrapperTestWithParams {
+    private final int numTasks;
+    private final int numSplits;
+
+    public UnboundedSourceWrapperTestWithParams(int numTasks, int numSplits) {
+      this.numTasks = numTasks;
+      this.numSplits = numSplits;
+    }
+
+    @Parameterized.Parameters
+    public static Collection<Object[]> data() {
+      /*
+       * Parameters for initializing the tests:
+       * {numTasks, numSplits}
+       * The test currently assumes powers of two for some assertions.
+       */
+      return Arrays.asList(new Object[][]{
+          {1, 1}, {1, 2}, {1, 4},
+          {2, 1}, {2, 2}, {2, 4},
+          {4, 1}, {4, 2}, {4, 4}
+      });
+    }
+
+    /**
+     * Creates a {@link UnboundedSourceWrapper} that has one or multiple 
readers per source.
+     * If numSplits > numTasks the source has one source will manage multiple 
readers.
+     */
+    @Test
+    public void testReaders() throws Exception {
+      final int numElements = 20;
+      final Object checkpointLock = new Object();
+      PipelineOptions options = PipelineOptionsFactory.create();
+
+      // this source will emit exactly NUM_ELEMENTS across all parallel 
readers,
+      // afterwards it will stall. We check whether we also receive 
NUM_ELEMENTS
+      // elements later.
+      TestCountingSource source = new TestCountingSource(numElements);
+      UnboundedSourceWrapper<KV<Integer, Integer>, 
TestCountingSource.CounterMark> flinkWrapper =
+          new UnboundedSourceWrapper<>(options, source, numSplits);
+
+      assertEquals(numSplits, flinkWrapper.getSplitSources().size());
+
+      StreamSource<WindowedValue<
+          KV<Integer, Integer>>,
+          UnboundedSourceWrapper<
+              KV<Integer, Integer>,
+              TestCountingSource.CounterMark>> sourceOperator = new 
StreamSource<>(flinkWrapper);
+
+      setupSourceOperator(sourceOperator, numTasks);
+
+      try {
+        sourceOperator.open();
+        sourceOperator.run(checkpointLock,
+            new Output<StreamRecord<WindowedValue<KV<Integer, Integer>>>>() {
+              private int count = 0;
+
+              @Override
+              public void emitWatermark(Watermark watermark) {
+              }
+
+              @Override
+              public void emitLatencyMarker(LatencyMarker latencyMarker) {
+              }
+
+              @Override
+              public void collect(
+                  StreamRecord<WindowedValue<KV<Integer, Integer>>> 
windowedValueStreamRecord) {
+
+                count++;
+                if (count >= numElements) {
+                  throw new SuccessException();
+                }
+              }
+
+              @Override
+              public void close() {
+
+              }
+            });
+      } catch (SuccessException e) {
+
+        assertEquals(Math.max(1, numSplits / numTasks), 
flinkWrapper.getLocalSplitSources().size());
+
+        // success
+        return;
+      }
+      fail("Read terminated without producing expected number of outputs");
+    }
+
+    /**
+     * Verify that snapshot/restore work as expected. We bring up a source and 
cancel
+     * after seeing a certain number of elements. Then we snapshot that source,
+     * bring up a completely new source that we restore from the snapshot and 
verify
+     * that we see all expected elements in the end.
+     */
+    @Test
+    public void testRestore() throws Exception {
+      final int numElements = 20;
+      final Object checkpointLock = new Object();
+      PipelineOptions options = PipelineOptionsFactory.create();
+
+      // this source will emit exactly NUM_ELEMENTS across all parallel 
readers,
+      // afterwards it will stall. We check whether we also receive 
NUM_ELEMENTS
+      // elements later.
+      TestCountingSource source = new TestCountingSource(numElements);
+      UnboundedSourceWrapper<KV<Integer, Integer>, 
TestCountingSource.CounterMark> flinkWrapper =
+          new UnboundedSourceWrapper<>(options, source, numSplits);
+
+      assertEquals(numSplits, flinkWrapper.getSplitSources().size());
+
+      StreamSource<
+          WindowedValue<KV<Integer, Integer>>,
+          UnboundedSourceWrapper<
+              KV<Integer, Integer>,
+              TestCountingSource.CounterMark>> sourceOperator = new 
StreamSource<>(flinkWrapper);
+
+
+      OperatorStateStore backend = mock(OperatorStateStore.class);
+
+      TestingListState<KV<UnboundedSource, TestCountingSource.CounterMark>>
+          listState = new TestingListState<>();
+
+      when(backend.getOperatorState(Matchers.any(ListStateDescriptor.class)))
+          .thenReturn(listState);
+
+      StateInitializationContext initializationContext = 
mock(StateInitializationContext.class);
+
+      when(initializationContext.getOperatorStateStore()).thenReturn(backend);
+      when(initializationContext.isRestored()).thenReturn(false, true);
+
+      flinkWrapper.initializeState(initializationContext);
+
+      setupSourceOperator(sourceOperator, numTasks);
+
+      final Set<KV<Integer, Integer>> emittedElements = new HashSet<>();
+
+      boolean readFirstBatchOfElements = false;
+
+      try {
+        sourceOperator.open();
+        sourceOperator.run(checkpointLock,
+            new Output<StreamRecord<WindowedValue<KV<Integer, Integer>>>>() {
+              private int count = 0;
+
+              @Override
+              public void emitWatermark(Watermark watermark) {
+              }
+
+              @Override
+              public void emitLatencyMarker(LatencyMarker latencyMarker) {
+              }
+
+              @Override
+              public void collect(
+                  StreamRecord<WindowedValue<KV<Integer, Integer>>> 
windowedValueStreamRecord) {
+
+                
emittedElements.add(windowedValueStreamRecord.getValue().getValue());
+                count++;
+                if (count >= numElements / 2) {
+                  throw new SuccessException();
+                }
+              }
+
+              @Override
+              public void close() {
+
+              }
+            });
+      } catch (SuccessException e) {
+        // success
+        readFirstBatchOfElements = true;
+      }
+
+      assertTrue("Did not successfully read first batch of elements.", 
readFirstBatchOfElements);
+
+      // draw a snapshot
+      flinkWrapper.snapshotState(new StateSnapshotContextSynchronousImpl(0, 
0));
+
+      // test snapshot offsets
+      assertEquals(flinkWrapper.getLocalSplitSources().size(),
+          listState.getList().size());
+      int totalEmit = 0;
+      for (KV<UnboundedSource, TestCountingSource.CounterMark> kv : 
listState.get()) {
+        totalEmit += kv.getValue().current + 1;
+      }
+      assertEquals(numElements / 2, totalEmit);
+
+      // test that finalizeCheckpoint on CheckpointMark is called
+      final ArrayList<Integer> finalizeList = new ArrayList<>();
+      TestCountingSource.setFinalizeTracker(finalizeList);
+      flinkWrapper.notifyCheckpointComplete(0);
+      assertEquals(flinkWrapper.getLocalSplitSources().size(), 
finalizeList.size());
+
+      // create a completely new source but restore from the snapshot
+      TestCountingSource restoredSource = new TestCountingSource(numElements);
+      UnboundedSourceWrapper<
+          KV<Integer, Integer>, TestCountingSource.CounterMark> 
restoredFlinkWrapper =
+          new UnboundedSourceWrapper<>(options, restoredSource, numSplits);
+
+      assertEquals(numSplits, restoredFlinkWrapper.getSplitSources().size());
+
+      StreamSource<
+          WindowedValue<KV<Integer, Integer>>,
+          UnboundedSourceWrapper<
+              KV<Integer, Integer>,
+              TestCountingSource.CounterMark>> restoredSourceOperator =
+          new StreamSource<>(restoredFlinkWrapper);
+
+      setupSourceOperator(restoredSourceOperator, numTasks);
+
+      // restore snapshot
+      restoredFlinkWrapper.initializeState(initializationContext);
+
+      boolean readSecondBatchOfElements = false;
+
+      // run again and verify that we see the other elements
+      try {
+        restoredSourceOperator.open();
+        restoredSourceOperator.run(checkpointLock,
+            new Output<StreamRecord<WindowedValue<KV<Integer, Integer>>>>() {
+              private int count = 0;
+
+              @Override
+              public void emitWatermark(Watermark watermark) {
+              }
+
+              @Override
+              public void emitLatencyMarker(LatencyMarker latencyMarker) {
+              }
+
+              @Override
+              public void collect(
+                  StreamRecord<WindowedValue<KV<Integer, Integer>>> 
windowedValueStreamRecord) {
+                
emittedElements.add(windowedValueStreamRecord.getValue().getValue());
+                count++;
+                if (count >= numElements / 2) {
+                  throw new SuccessException();
+                }
+              }
+
+              @Override
+              public void close() {
+
+              }
+            });
+      } catch (SuccessException e) {
+        // success
+        readSecondBatchOfElements = true;
+      }
+
+      assertEquals(Math.max(1, numSplits / numTasks), 
flinkWrapper.getLocalSplitSources().size());
+
+      assertTrue("Did not successfully read second batch of elements.", 
readSecondBatchOfElements);
+
+      // verify that we saw all NUM_ELEMENTS elements
+      assertTrue(emittedElements.size() == numElements);
+    }
+
+    @Test
+    public void testNullCheckpoint() throws Exception {
+      final int numElements = 20;
+      PipelineOptions options = PipelineOptionsFactory.create();
+
+      TestCountingSource source = new TestCountingSource(numElements) {
+        @Override
+        public Coder<CounterMark> getCheckpointMarkCoder() {
+          return null;
+        }
+      };
+      UnboundedSourceWrapper<KV<Integer, Integer>, 
TestCountingSource.CounterMark> flinkWrapper =
+          new UnboundedSourceWrapper<>(options, source, numSplits);
+
+      OperatorStateStore backend = mock(OperatorStateStore.class);
+
+      TestingListState<KV<UnboundedSource, TestCountingSource.CounterMark>>
+          listState = new TestingListState<>();
+
+      when(backend.getOperatorState(Matchers.any(ListStateDescriptor.class)))
+          .thenReturn(listState);
+
+      StateInitializationContext initializationContext = 
mock(StateInitializationContext.class);
+
+      when(initializationContext.getOperatorStateStore()).thenReturn(backend);
+      when(initializationContext.isRestored()).thenReturn(false, true);
+
+      flinkWrapper.initializeState(initializationContext);
+
+      StreamSource sourceOperator = new StreamSource<>(flinkWrapper);
+      setupSourceOperator(sourceOperator, numTasks);
+      sourceOperator.open();
+
+      flinkWrapper.snapshotState(new StateSnapshotContextSynchronousImpl(0, 
0));
+
+      assertEquals(0, listState.getList().size());
+
+      UnboundedSourceWrapper<
+          KV<Integer, Integer>, TestCountingSource.CounterMark> 
restoredFlinkWrapper =
+          new UnboundedSourceWrapper<>(options, new 
TestCountingSource(numElements),
+              numSplits);
+
+      StreamSource restoredSourceOperator = new StreamSource<>(flinkWrapper);
+      setupSourceOperator(restoredSourceOperator, numTasks);
+      sourceOperator.open();
+
+      restoredFlinkWrapper.initializeState(initializationContext);
+
+      assertEquals(Math.max(1, numSplits / numTasks), 
flinkWrapper.getLocalSplitSources().size());
+
+    }
+
+    @SuppressWarnings("unchecked")
+    private static <T> void setupSourceOperator(StreamSource<T, ?> operator, 
int numSubTasks) {
+      ExecutionConfig executionConfig = new ExecutionConfig();
+      StreamConfig cfg = new StreamConfig(new Configuration());
+
+      cfg.setTimeCharacteristic(TimeCharacteristic.EventTime);
+
+      Environment env = new DummyEnvironment("MockTwoInputTask", numSubTasks, 
0);
+
+      StreamTask<?, ?> mockTask = mock(StreamTask.class);
+      when(mockTask.getName()).thenReturn("Mock Task");
+      when(mockTask.getCheckpointLock()).thenReturn(new Object());
+      when(mockTask.getConfiguration()).thenReturn(cfg);
+      when(mockTask.getEnvironment()).thenReturn(env);
+      when(mockTask.getExecutionConfig()).thenReturn(executionConfig);
+      when(mockTask.getAccumulatorMap())
+          .thenReturn(Collections.<String, Accumulator<?, ?>>emptyMap());
+      TestProcessingTimeService testProcessingTimeService = new 
TestProcessingTimeService();
+      
when(mockTask.getProcessingTimeService()).thenReturn(testProcessingTimeService);
+
+      operator.setup(mockTask, cfg, (Output<StreamRecord<T>>) 
mock(Output.class));
+    }
+
+    /**
+     * A special {@link RuntimeException} that we throw to signal that the 
test was successful.
+     */
+    private static class SuccessException extends RuntimeException {
+    }
+  }
+
+  /**
+   * Not parameterized tests.
+   */
+  public static class BasicTest {
+
+    /**
+     * Check serialization a {@link UnboundedSourceWrapper}.
+     */
+    @Test
+    public void testSerialization() throws Exception {
+      final int parallelism = 1;
+      final int numElements = 20;
+      PipelineOptions options = PipelineOptionsFactory.create();
+
+      TestCountingSource source = new TestCountingSource(numElements);
+      UnboundedSourceWrapper<KV<Integer, Integer>, 
TestCountingSource.CounterMark> flinkWrapper =
+          new UnboundedSourceWrapper<>(options, source, parallelism);
+
+      InstantiationUtil.serializeObject(flinkWrapper);
+    }
+
+  }
+
+  private static final class TestingListState<T> implements ListState<T> {
+
+    private final List<T> list = new ArrayList<>();
+
+    @Override
+    public void clear() {
+      list.clear();
+    }
+
+    @Override
+    public Iterable<T> get() throws Exception {
+      return list;
+    }
+
+    @Override
+    public void add(T value) throws Exception {
+      list.add(value);
+    }
+
+    public List<T> getList() {
+      return list;
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/package-info.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/package-info.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/package-info.java
new file mode 100644
index 0000000..08a1e03
--- /dev/null
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Internal implementation of the Beam runner for Apache Flink.
+ */
+package org.apache.beam.runners.flink.streaming;

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/resources/log4j-test.properties 
b/runners/flink/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..4c74d85
--- /dev/null
+++ b/runners/flink/src/test/resources/log4j-test.properties
@@ -0,0 +1,27 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+log4j.rootLogger=OFF, testlogger
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n

Reply via email to