Repository: incubator-beam
Updated Branches:
  refs/heads/master cca2577c6 -> 36a27f538


[Beam-312] don't checkpoint if CheckpointCoder not available

This skips the checkpoint logic in the UnboundedSourceWrapper if the
UnboundedSource doesn't supply a CheckpointMarkCoder.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9c4072ad
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9c4072ad
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9c4072ad

Branch: refs/heads/master
Commit: 9c4072ad87f25248f77e437e5bcf674aff19982b
Parents: cca2577
Author: Maximilian Michels <m...@apache.org>
Authored: Mon May 30 15:59:12 2016 +0200
Committer: Maximilian Michels <m...@apache.org>
Committed: Sat May 28 16:17:15 2016 +0200

----------------------------------------------------------------------
 .../streaming/io/UnboundedSourceWrapper.java    | 24 +++++++++++++++++---
 1 file changed, 21 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9c4072ad/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
index b816e2a..7f26a65 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
@@ -120,10 +120,17 @@ public class UnboundedSourceWrapper<
     }
 
     Coder<CheckpointMarkT> checkpointMarkCoder = 
source.getCheckpointMarkCoder();
-    Coder<? extends UnboundedSource<OutputT, CheckpointMarkT>> sourceCoder =
-        SerializableCoder.of(new TypeDescriptor<UnboundedSource<OutputT, 
CheckpointMarkT>>() {});
+    if (checkpointMarkCoder == null) {
+      LOG.info("No CheckpointMarkCoder specified for this source. Won't create 
snapshots.");
+      checkpointCoder = null;
+    } else {
+
+      Coder<? extends UnboundedSource<OutputT, CheckpointMarkT>> sourceCoder =
+          SerializableCoder.of(new TypeDescriptor<UnboundedSource<OutputT, 
CheckpointMarkT>>() {
+          });
 
-    checkpointCoder = (ListCoder) ListCoder.of(KvCoder.of(sourceCoder, 
checkpointMarkCoder));
+      checkpointCoder = (ListCoder) ListCoder.of(KvCoder.of(sourceCoder, 
checkpointMarkCoder));
+    }
 
     // get the splits early. we assume that the generated splits are stable,
     // this is necessary so that the mapping of state to source is correct
@@ -308,6 +315,12 @@ public class UnboundedSourceWrapper<
 
   @Override
   public byte[] snapshotState(long l, long l1) throws Exception {
+
+    if (checkpointCoder == null) {
+      // no checkpoint coder available in this source
+      return null;
+    }
+
     // we checkpoint the sources along with the CheckpointMarkT to ensure
     // than we have a correct mapping of checkpoints to sources when
     // restoring
@@ -333,6 +346,11 @@ public class UnboundedSourceWrapper<
 
   @Override
   public void restoreState(byte[] bytes) throws Exception {
+    if (checkpointCoder == null) {
+      // no checkpoint coder available in this source
+      return;
+    }
+
     try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes)) {
       restoredState = checkpointCoder.decode(bais, Coder.Context.OUTER);
     }

Reply via email to