Repository: beam Updated Branches: refs/heads/master 6be9e0bb2 -> 9c14d7397
Adapt Flink StateInternals to new state semantics Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9ddaf293 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9ddaf293 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9ddaf293 Branch: refs/heads/master Commit: 9ddaf29316cd50137e71acc65114fa54b3c7e073 Parents: 6be9e0b Author: Aljoscha Krettek <aljoscha.kret...@gmail.com> Authored: Wed Sep 27 10:00:36 2017 +0200 Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com> Committed: Fri Sep 29 17:04:10 2017 +0200 ---------------------------------------------------------------------- .../wrappers/streaming/state/FlinkStateInternals.java | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/9ddaf293/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java index a0b015b..bbe79db 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java @@ -17,6 +17,7 @@ */ package org.apache.beam.runners.flink.translation.wrappers.streaming.state; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import java.nio.ByteBuffer; @@ -312,7 +313,7 @@ public class FlinkStateInternals<K> implements StateInternals { StringSerializer.INSTANCE, flinkStateDescriptor).get(); - return result != null ? result : Collections.<T>emptyList(); + return result != null ? ImmutableList.copyOf(result) : Collections.<T>emptyList(); } catch (Exception e) { throw new RuntimeException("Error reading state.", e); } @@ -943,7 +944,7 @@ public class FlinkStateInternals<K> implements StateInternals { namespace.stringKey(), StringSerializer.INSTANCE, flinkStateDescriptor).keys(); - return result != null ? result : Collections.<KeyT>emptyList(); + return result != null ? ImmutableList.copyOf(result) : Collections.<KeyT>emptyList(); } catch (Exception e) { throw new RuntimeException("Error get map state keys.", e); } @@ -966,7 +967,7 @@ public class FlinkStateInternals<K> implements StateInternals { namespace.stringKey(), StringSerializer.INSTANCE, flinkStateDescriptor).values(); - return result != null ? result : Collections.<ValueT>emptyList(); + return result != null ? ImmutableList.copyOf(result) : Collections.<ValueT>emptyList(); } catch (Exception e) { throw new RuntimeException("Error get map state values.", e); } @@ -989,7 +990,9 @@ public class FlinkStateInternals<K> implements StateInternals { namespace.stringKey(), StringSerializer.INSTANCE, flinkStateDescriptor).entries(); - return result != null ? result : Collections.<Map.Entry<KeyT, ValueT>>emptyList(); + return result != null + ? ImmutableList.copyOf(result) + : Collections.<Map.Entry<KeyT, ValueT>>emptyList(); } catch (Exception e) { throw new RuntimeException("Error get map state entries.", e); } @@ -1146,7 +1149,7 @@ public class FlinkStateInternals<K> implements StateInternals { namespace.stringKey(), StringSerializer.INSTANCE, flinkStateDescriptor).keys(); - return result != null ? result : Collections.<T>emptyList(); + return result != null ? ImmutableList.copyOf(result) : Collections.<T>emptyList(); } catch (Exception e) { throw new RuntimeException("Error read from state.", e); }