This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 6148a6d063a0503ee435ab5084fcba3fb864b26f
Author: Aljoscha Krettek <aljoscha.kret...@gmail.com>
AuthorDate: Thu Feb 22 14:26:16 2018 +0100

    [BEAM-3727] Never shutdown sources in Flink Streaming execution mode
    
    This adds an option that controls whether to shutdown sources or not in
    case of reaching the +Inf watermark.
    
    The reason for this is https://issues.apache.org/jira/browse/FLINK-2491,
    which causes checkpointing to stop once some source is shut down.
---
 .../beam/runners/flink/FlinkPipelineOptions.java   | 12 +++++
 .../apache/beam/runners/flink/TestFlinkRunner.java |  1 +
 .../streaming/io/BoundedSourceWrapper.java         | 28 +++++++++++
 .../streaming/io/UnboundedSourceWrapper.java       | 56 ++++++++++++----------
 4 files changed, 71 insertions(+), 26 deletions(-)

diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
index 01f7847..b2cbefb 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
@@ -137,4 +137,16 @@ public interface FlinkPipelineOptions
   Long getMaxBundleTimeMills();
   void setMaxBundleTimeMills(Long time);
 
+  /**
+   * Whether to shutdown sources when their watermark reaches {@code +Inf}. 
For production use
+   * cases you want this to be disabled because Flink will currently (versions 
{@literal <=} 1.5)
+   * stop doing checkpoints when any operator (which includes sources) is 
finished.
+   *
+   * <p>Please see <a 
href="https://issues.apache.org/jira/browse/FLINK-2491";>FLINK-2491</a> for
+   * progress on this issue.
+   */
+  @Description("If set, shutdown sources when their watermark reaches +Inf.")
+  @Default.Boolean(false)
+  Boolean isShutdownSourcesOnFinalWatermark();
+  void setShutdownSourcesOnFinalWatermark(Boolean shutdownOnFinalWatermark);
 }
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java
index 01b67e5..47d4494 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java
@@ -36,6 +36,7 @@ public class TestFlinkRunner extends 
PipelineRunner<PipelineResult> {
   private TestFlinkRunner(FlinkPipelineOptions options) {
     // We use [auto] for testing since this will make it pick up the Testing 
ExecutionEnvironment
     options.setFlinkMaster("[auto]");
+    options.setShutdownSourcesOnFinalWatermark(true);
     this.delegate = FlinkRunner.fromOptions(options);
   }
 
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java
index 5ddc46f..6db5426 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java
@@ -21,6 +21,7 @@ import com.google.common.annotations.VisibleForTesting;
 import java.util.ArrayList;
 import java.util.List;
 import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
+import org.apache.beam.runners.flink.FlinkPipelineOptions;
 import org.apache.beam.runners.flink.metrics.FlinkMetricContainer;
 import org.apache.beam.runners.flink.metrics.ReaderInvocationUtil;
 import org.apache.beam.sdk.io.BoundedSource;
@@ -180,6 +181,33 @@ public class BoundedSourceWrapper<OutputT>
 
     // emit final Long.MAX_VALUE watermark, just to be sure
     ctx.emitWatermark(new Watermark(Long.MAX_VALUE));
+
+    FlinkPipelineOptions options = 
serializedOptions.get().as(FlinkPipelineOptions.class);
+    if (!options.isShutdownSourcesOnFinalWatermark()) {
+      // do nothing, but still look busy ...
+      // we can't return here since Flink requires that all operators stay up,
+      // otherwise checkpointing would not work correctly anymore
+      //
+      // See https://issues.apache.org/jira/browse/FLINK-2491 for progress on 
this issue
+
+      // wait until this is canceled
+      final Object waitLock = new Object();
+      while (isRunning) {
+        try {
+          // Flink will interrupt us at some point
+          //noinspection SynchronizationOnLocalVariableOrMethodParameter
+          synchronized (waitLock) {
+            // don't wait indefinitely, in case something goes horribly wrong
+            waitLock.wait(1000);
+          }
+        } catch (InterruptedException e) {
+          if (!isRunning) {
+            // restore the interrupted state, and fall through the loop
+            Thread.currentThread().interrupt();
+          }
+        }
+      }
+    }
   }
 
   /**
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
index 817dd74..fc23c01 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
@@ -23,6 +23,7 @@ import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
+import org.apache.beam.runners.flink.FlinkPipelineOptions;
 import org.apache.beam.runners.flink.metrics.FlinkMetricContainer;
 import org.apache.beam.runners.flink.metrics.ReaderInvocationUtil;
 import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
@@ -224,32 +225,7 @@ public class UnboundedSourceWrapper<
             serializedOptions.get(),
             metricContainer);
 
-    if (localReaders.size() == 0) {
-      // do nothing, but still look busy ...
-      // also, output a Long.MAX_VALUE watermark since we know that we're not
-      // going to emit anything
-      // we can't return here since Flink requires that all operators stay up,
-      // otherwise checkpointing would not work correctly anymore
-      ctx.emitWatermark(new Watermark(Long.MAX_VALUE));
-
-      // wait until this is canceled
-      final Object waitLock = new Object();
-      while (isRunning) {
-        try {
-          // Flink will interrupt us at some point
-          //noinspection SynchronizationOnLocalVariableOrMethodParameter
-          synchronized (waitLock) {
-            // don't wait indefinitely, in case something goes horribly wrong
-            waitLock.wait(1000);
-          }
-        } catch (InterruptedException e) {
-          if (!isRunning) {
-            // restore the interrupted state, and fall through the loop
-            Thread.currentThread().interrupt();
-          }
-        }
-      }
-    } else if (localReaders.size() == 1) {
+    if (localReaders.size() == 1) {
       // the easy case, we just read from one reader
       UnboundedSource.UnboundedReader<OutputT> reader = localReaders.get(0);
 
@@ -305,7 +281,35 @@ public class UnboundedSourceWrapper<
           hadData = false;
         }
       }
+    }
 
+    ctx.emitWatermark(new Watermark(Long.MAX_VALUE));
+
+    FlinkPipelineOptions options = 
serializedOptions.get().as(FlinkPipelineOptions.class);
+    if (!options.isShutdownSourcesOnFinalWatermark()) {
+      // do nothing, but still look busy ...
+      // we can't return here since Flink requires that all operators stay up,
+      // otherwise checkpointing would not work correctly anymore
+      //
+      // See https://issues.apache.org/jira/browse/FLINK-2491 for progress on 
this issue
+
+      // wait until this is canceled
+      final Object waitLock = new Object();
+      while (isRunning) {
+        try {
+          // Flink will interrupt us at some point
+          //noinspection SynchronizationOnLocalVariableOrMethodParameter
+          synchronized (waitLock) {
+            // don't wait indefinitely, in case something goes horribly wrong
+            waitLock.wait(1000);
+          }
+        } catch (InterruptedException e) {
+          if (!isRunning) {
+            // restore the interrupted state, and fall through the loop
+            Thread.currentThread().interrupt();
+          }
+        }
+      }
     }
   }
 

-- 
To stop receiving notification emails like this one, please contact
aljos...@apache.org.

Reply via email to