Repository: beam
Updated Branches:
  refs/heads/master 6be9e0bb2 -> 9c14d7397


Adapt Flink StateInternals to new state semantics


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9ddaf293
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9ddaf293
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9ddaf293

Branch: refs/heads/master
Commit: 9ddaf29316cd50137e71acc65114fa54b3c7e073
Parents: 6be9e0b
Author: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Authored: Wed Sep 27 10:00:36 2017 +0200
Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Committed: Fri Sep 29 17:04:10 2017 +0200

----------------------------------------------------------------------
 .../wrappers/streaming/state/FlinkStateInternals.java  | 13 ++++++++-----
 1 file changed, 8 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/9ddaf293/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
index a0b015b..bbe79db 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.runners.flink.translation.wrappers.streaming.state;
 
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import java.nio.ByteBuffer;
@@ -312,7 +313,7 @@ public class FlinkStateInternals<K> implements 
StateInternals {
             StringSerializer.INSTANCE,
             flinkStateDescriptor).get();
 
-        return result != null ? result : Collections.<T>emptyList();
+        return result != null ? ImmutableList.copyOf(result) : 
Collections.<T>emptyList();
       } catch (Exception e) {
         throw new RuntimeException("Error reading state.", e);
       }
@@ -943,7 +944,7 @@ public class FlinkStateInternals<K> implements 
StateInternals {
                 namespace.stringKey(),
                 StringSerializer.INSTANCE,
                 flinkStateDescriptor).keys();
-            return result != null ? result : Collections.<KeyT>emptyList();
+            return result != null ? ImmutableList.copyOf(result) : 
Collections.<KeyT>emptyList();
           } catch (Exception e) {
             throw new RuntimeException("Error get map state keys.", e);
           }
@@ -966,7 +967,7 @@ public class FlinkStateInternals<K> implements 
StateInternals {
                 namespace.stringKey(),
                 StringSerializer.INSTANCE,
                 flinkStateDescriptor).values();
-            return result != null ? result : Collections.<ValueT>emptyList();
+            return result != null ? ImmutableList.copyOf(result) : 
Collections.<ValueT>emptyList();
           } catch (Exception e) {
             throw new RuntimeException("Error get map state values.", e);
           }
@@ -989,7 +990,9 @@ public class FlinkStateInternals<K> implements 
StateInternals {
                 namespace.stringKey(),
                 StringSerializer.INSTANCE,
                 flinkStateDescriptor).entries();
-            return result != null ? result : Collections.<Map.Entry<KeyT, 
ValueT>>emptyList();
+            return result != null
+                ? ImmutableList.copyOf(result)
+                : Collections.<Map.Entry<KeyT, ValueT>>emptyList();
           } catch (Exception e) {
             throw new RuntimeException("Error get map state entries.", e);
           }
@@ -1146,7 +1149,7 @@ public class FlinkStateInternals<K> implements 
StateInternals {
             namespace.stringKey(),
             StringSerializer.INSTANCE,
             flinkStateDescriptor).keys();
-        return result != null ? result : Collections.<T>emptyList();
+        return result != null ? ImmutableList.copyOf(result) : 
Collections.<T>emptyList();
       } catch (Exception e) {
         throw new RuntimeException("Error read from state.", e);
       }

Reply via email to