[GitHub] flink pull request #6156: [FLINK-9572] Extend InternalAppendingState with in...

2018-06-14 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6156


---


[GitHub] flink pull request #6156: [FLINK-9572] Extend InternalAppendingState with in...

2018-06-13 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6156#discussion_r195136079
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalAppendingState.java
 ---
@@ -31,4 +31,22 @@
  * @param  The type of elements in the state
  * @param  The type of the resulting element in the state
  */
-public interface InternalAppendingState extends 
InternalKvState, AppendingState {}
+public interface InternalAppendingState extends 
InternalKvState, AppendingState {
--- End diff --

I had a second thought about this and I think just adding the methods only 
in `InternalAppendingState` might be a better choice in the end, because it 
feels only required here to have a way of manipulating the internal type. Sorry 
for that :)


---


[GitHub] flink pull request #6156: [FLINK-9572] Extend InternalAppendingState with in...

2018-06-13 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6156#discussion_r195120961
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalAppendingState.java
 ---
@@ -22,7 +22,7 @@
 
 /**
  * The peer to the {@link AppendingState} in the internal state type 
hierarchy.
- * 
+ *
--- End diff --

Please revert this.


---


[GitHub] flink pull request #6156: [FLINK-9572] Extend InternalAppendingState with in...

2018-06-13 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6156#discussion_r195118823
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapValueState.java
 ---
@@ -30,7 +30,7 @@
  * @param  The type of the value.
  */
 public class HeapValueState
-   extends AbstractHeapState>
+   extends AbstractHeapState
--- End diff --

The same also holds for `HeapMapState` etc.


---


[GitHub] flink pull request #6156: [FLINK-9572] Extend InternalAppendingState with in...

2018-06-13 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6156#discussion_r195118305
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapValueState.java
 ---
@@ -30,7 +30,7 @@
  * @param  The type of the value.
  */
 public class HeapValueState
-   extends AbstractHeapState>
+   extends AbstractHeapState
--- End diff --

You could consider to also use the new `getInternal()` and 
`updateInternal()` methods insider the methods of this class as well to replace 
direct calls to the `stateTable`.


---


[GitHub] flink pull request #6156: [FLINK-9572] Extend InternalAppendingState with in...

2018-06-13 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6156#discussion_r195116930
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractStateTableSnapshot.java
 ---
@@ -48,4 +48,4 @@
@Override
public void release() {
}
-}
\ No newline at end of file
+}
--- End diff --

Please revert, because it produces change in an unrelated class.


---


[GitHub] flink pull request #6156: [FLINK-9572] Extend InternalAppendingState with in...

2018-06-13 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6156#discussion_r195116411
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapMergingState.java
 ---
@@ -34,12 +33,10 @@
  * @param  The type of the input elements.
  * @param  The type of the values in the state.
  * @param  The type of the output elements.
- * @param  The type of State
  */
-public abstract class AbstractHeapMergingState
-   extends AbstractHeapState
-   implements InternalMergingState {
-
+public abstract class AbstractHeapMergingState
+   extends AbstractHeapState
+   implements InternalMergingState, 
org.apache.flink.runtime.state.internal.InternalAppendingState {
--- End diff --

I would make this a regular import.


---


[GitHub] flink pull request #6156: [FLINK-9572] Extend InternalAppendingState with in...

2018-06-13 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6156#discussion_r195114015
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
 ---
@@ -94,6 +97,15 @@ public RocksDBListState(
return valueSerializer;
}
 
+   @Override
+   public byte[] getSerializedValue(
--- End diff --

I don't think this overriding makes sense.


---


[GitHub] flink pull request #6156: [FLINK-9572] Extend InternalAppendingState with in...

2018-06-13 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6156#discussion_r195111820
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
 ---
@@ -237,4 +249,20 @@ public void addAll(List values) throws Exception {
 
return keySerializationStream.toByteArray();
}
+
+   @Override
+   public List getInternal() {
+   Iterable list = get();
+   if (list == null) {
+   return null;
+   }
+   List collected = new ArrayList<>();
--- End diff --

We could currently also safe the whole repacking if we change the signature 
of `Iterable get()` in this class to return ``List``.
However, I think in the long run it might be worth considering to have this 
class be based on `Iterable` instead of `List` because we essentially only use 
iterable semantics. @aljoscha what do you think?


---


[GitHub] flink pull request #6156: [FLINK-9572] Extend InternalAppendingState with in...

2018-06-13 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6156#discussion_r195089054
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalAppendingState.java
 ---
@@ -31,4 +31,22 @@
  * @param  The type of elements in the state
  * @param  The type of the resulting element in the state
  */
-public interface InternalAppendingState extends 
InternalKvState, AppendingState {}
+public interface InternalAppendingState extends 
InternalKvState, AppendingState {
--- End diff --

It almost seems to me that this methods are not truly specific to merging 
state and could simply become part of the ``InternalKvState`` interface and the 
abstract implementations can also move directly to the respective abstract 
classes ``AbstractRocksDBState`` and ``AbstractHeapState``.


---


[GitHub] flink pull request #6156: [FLINK-9572] Extend InternalAppendingState with in...

2018-06-13 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6156#discussion_r195019971
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
 ---
@@ -237,4 +249,20 @@ public void addAll(List values) throws Exception {
 
return keySerializationStream.toByteArray();
}
+
+   @Override
+   public List getInternal() {
+   Iterable list = get();
+   if (list == null) {
+   return null;
+   }
+   List collected = new ArrayList<>();
--- End diff --

Maybe we could give this a safe init size, e.g (4).


---


[GitHub] flink pull request #6156: [FLINK-9572] Extend InternalAppendingState with in...

2018-06-12 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6156#discussion_r194952224
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
 ---
@@ -87,55 +87,14 @@ public RocksDBAggregatingState(
 
@Override
public R get() throws IOException {
-   try {
-   // prepare the current key and namespace for RocksDB 
lookup
-   writeCurrentKeyWithGroupAndNamespace();
-   final byte[] key = keySerializationStream.toByteArray();
-
-   // get the current value
-   final byte[] valueBytes = backend.db.get(columnFamily, 
key);
-
-   if (valueBytes == null) {
-   return null;
-   }
-
-   ACC accumulator = valueSerializer.deserialize(new 
DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(valueBytes)));
-   return aggFunction.getResult(accumulator);
-   }
-   catch (IOException | RocksDBException e) {
-   throw new IOException("Error while retrieving value 
from RocksDB", e);
-   }
+   return aggFunction.getResult(getInternal());
}
 
@Override
public void add(T value) throws IOException {
-   try {
-   // prepare the current key and namespace for RocksDB 
lookup
-   writeCurrentKeyWithGroupAndNamespace();
-   final byte[] key = keySerializationStream.toByteArray();
-   keySerializationStream.reset();
-
-   // get the current value
-   final byte[] valueBytes = backend.db.get(columnFamily, 
key);
-
-   // deserialize the current accumulator, or create a 
blank one
-   ACC accumulator = valueBytes == null ?
-   aggFunction.createAccumulator() :
-   valueSerializer.deserialize(new 
DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(valueBytes)));
-
-   // aggregate the value into the accumulator
-   accumulator = aggFunction.add(value, accumulator);
-
-   // serialize the new accumulator
-   final DataOutputViewStreamWrapper out = new 
DataOutputViewStreamWrapper(keySerializationStream);
-   valueSerializer.serialize(accumulator, out);
-
-   // write the new value to RocksDB
-   backend.db.put(columnFamily, writeOptions, key, 
keySerializationStream.toByteArray());
-   }
-   catch (IOException | RocksDBException e) {
-   throw new IOException("Error while adding value to 
RocksDB", e);
-   }
+   ACC accumulator = getInternal();
+   accumulator = accumulator == null ? 
aggFunction.createAccumulator() : accumulator;
+   updateInternal(aggFunction.add(value, accumulator));
--- End diff --

This is the same as for `FoldState` and `ReducingState`, do you think this 
should be improved? I'm not sure because the serialization of the key bytes 
seems not so expensive in the most user cases.


---


[GitHub] flink pull request #6156: [FLINK-9572] Extend InternalAppendingState with in...

2018-06-12 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6156#discussion_r194951927
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
 ---
@@ -87,55 +87,14 @@ public RocksDBAggregatingState(
 
@Override
public R get() throws IOException {
-   try {
-   // prepare the current key and namespace for RocksDB 
lookup
-   writeCurrentKeyWithGroupAndNamespace();
-   final byte[] key = keySerializationStream.toByteArray();
-
-   // get the current value
-   final byte[] valueBytes = backend.db.get(columnFamily, 
key);
-
-   if (valueBytes == null) {
-   return null;
-   }
-
-   ACC accumulator = valueSerializer.deserialize(new 
DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(valueBytes)));
-   return aggFunction.getResult(accumulator);
-   }
-   catch (IOException | RocksDBException e) {
-   throw new IOException("Error while retrieving value 
from RocksDB", e);
-   }
+   return aggFunction.getResult(getInternal());
}
 
@Override
public void add(T value) throws IOException {
-   try {
-   // prepare the current key and namespace for RocksDB 
lookup
-   writeCurrentKeyWithGroupAndNamespace();
-   final byte[] key = keySerializationStream.toByteArray();
-   keySerializationStream.reset();
-
-   // get the current value
-   final byte[] valueBytes = backend.db.get(columnFamily, 
key);
-
-   // deserialize the current accumulator, or create a 
blank one
-   ACC accumulator = valueBytes == null ?
-   aggFunction.createAccumulator() :
-   valueSerializer.deserialize(new 
DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(valueBytes)));
-
-   // aggregate the value into the accumulator
-   accumulator = aggFunction.add(value, accumulator);
-
-   // serialize the new accumulator
-   final DataOutputViewStreamWrapper out = new 
DataOutputViewStreamWrapper(keySerializationStream);
-   valueSerializer.serialize(accumulator, out);
-
-   // write the new value to RocksDB
-   backend.db.put(columnFamily, writeOptions, key, 
keySerializationStream.toByteArray());
-   }
-   catch (IOException | RocksDBException e) {
-   throw new IOException("Error while adding value to 
RocksDB", e);
-   }
+   ACC accumulator = getInternal();
+   accumulator = accumulator == null ? 
aggFunction.createAccumulator() : accumulator;
+   updateInternal(aggFunction.add(value, accumulator));
--- End diff --

We have to serialize the key bytes twice currently, the previous version 
only need to serialize the key bytes once.


---


[GitHub] flink pull request #6156: [FLINK-9572] Extend InternalAppendingState with in...

2018-06-12 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6156#discussion_r194951490
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBAppendingState.java
 ---
@@ -0,0 +1,71 @@
+package org.apache.flink.contrib.streaming.state;
--- End diff --

RAT problem.


---


[GitHub] flink pull request #6156: [FLINK-9572] Extend InternalAppendingState with in...

2018-06-12 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6156#discussion_r194951510
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapAppendingState.java
 ---
@@ -0,0 +1,48 @@
+package org.apache.flink.runtime.state.heap;
--- End diff --

RAT problem.


---


[GitHub] flink pull request #6156: [FLINK-9572] Extend InternalAppendingState with in...

2018-06-12 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6156#discussion_r194951373
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBAppendingState.java
 ---
@@ -0,0 +1,71 @@
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.internal.InternalAppendingState;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDBException;
+
+import java.io.IOException;
+
+abstract class AbstractRocksDBAppendingState 
+   extends AbstractRocksDBState
+   implements InternalAppendingState {
+
+   /**
+* Creates a new RocksDB backed state.
--- End diff --

typo: `backed` -> `backend`


---


[GitHub] flink pull request #6156: [FLINK-9572] Extend InternalAppendingState with in...

2018-06-12 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6156#discussion_r194947600
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBAppendingState.java
 ---
@@ -0,0 +1,71 @@
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.internal.InternalAppendingState;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDBException;
+
+import java.io.IOException;
+
+abstract class AbstractRocksDBAppendingState 
+   extends AbstractRocksDBState
+   implements InternalAppendingState {
+
+   /**
+* Creates a new RocksDB backed state.
+*
+* @param columnFamilyThe RocksDB column family that this state 
is associated to.
+* @param namespaceSerializer The serializer for the namespace.
+* @param valueSerializer The serializer for the state.
+* @param defaultValueThe default value for the state.
+* @param backend The backend for which this state is bind 
to.
+*/
+   protected AbstractRocksDBAppendingState(
+   ColumnFamilyHandle columnFamily,
+   TypeSerializer namespaceSerializer,
+   TypeSerializer valueSerializer,
+   SV defaultValue,
+   RocksDBKeyedStateBackend backend) {
+   super(columnFamily, namespaceSerializer, valueSerializer, 
defaultValue, backend);
+   }
+
+   @Override
+   public SV getInternal() throws IOException {
+   try {
+   writeCurrentKeyWithGroupAndNamespace();
+   byte[] key = keySerializationStream.toByteArray();
+   byte[] valueBytes = backend.db.get(columnFamily, key);
+   if (valueBytes == null) {
+   return null;
+   }
+   return valueSerializer.deserialize(new 
DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(valueBytes)));
+   } catch (IOException | RocksDBException e) {
+   throw new IOException("Error while retrieving data from 
RocksDB", e);
+   }
+   }
+
+   @Override
+   public void updateInternal(SV valueToStore) throws IOException {
+   try {
+   // prepare the current key and namespace for RocksDB 
lookup
+   writeCurrentKeyWithGroupAndNamespace();
+   final byte[] key = keySerializationStream.toByteArray();
+   keySerializationStream.reset();
+
+   // serialize new value
+   final DataOutputViewStreamWrapper out = new 
DataOutputViewStreamWrapper(keySerializationStream);
+   valueSerializer.serialize(valueToStore, out);
+
+   // write the new value to RocksDB
+   backend.db.put(columnFamily, writeOptions, key, 
keySerializationStream.toByteArray());
+   }
+   catch (IOException | RocksDBException e) {
+   throw new IOException("Error while adding value to 
RocksDB", e);
--- End diff --

nit: throws `IOException` seems like a bit weird.. could this be replaced 
by the `FlinkRuntimeException`?


---


[GitHub] flink pull request #6156: [FLINK-9572] Extend InternalAppendingState with in...

2018-06-12 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6156#discussion_r194946663
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
 ---
@@ -87,55 +87,14 @@ public RocksDBAggregatingState(
 
@Override
public R get() throws IOException {
-   try {
-   // prepare the current key and namespace for RocksDB 
lookup
-   writeCurrentKeyWithGroupAndNamespace();
-   final byte[] key = keySerializationStream.toByteArray();
-
-   // get the current value
-   final byte[] valueBytes = backend.db.get(columnFamily, 
key);
-
-   if (valueBytes == null) {
-   return null;
-   }
-
-   ACC accumulator = valueSerializer.deserialize(new 
DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(valueBytes)));
-   return aggFunction.getResult(accumulator);
-   }
-   catch (IOException | RocksDBException e) {
-   throw new IOException("Error while retrieving value 
from RocksDB", e);
-   }
+   return aggFunction.getResult(getInternal());
--- End diff --

This might throw `NEP`, cause `getInternal` return `null`.


---


[GitHub] flink pull request #6156: [FLINK-9572] Extend InternalAppendingState with in...

2018-06-12 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6156#discussion_r194795632
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
 ---
@@ -237,4 +242,16 @@ public void addAll(List values) throws Exception {
 
return keySerializationStream.toByteArray();
}
+
+   @Override
+   public List getInternal() {
+   List list = new ArrayList<>();
+   get().forEach(list::add);
--- End diff --

There seems could be NPE here, cause `get()` may return `null`.


---


[GitHub] flink pull request #6156: [FLINK-9572] Extend InternalAppendingState with in...

2018-06-12 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6156#discussion_r194796355
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
 ---
@@ -94,6 +94,11 @@ public RocksDBListState(
return valueSerializer;
}
 
+   @Override
+   public byte[] getSerializedValue(byte[] serializedKeyAndNamespace, 
TypeSerializer safeKeySerializer, TypeSerializer safeNamespaceSerializer, 
TypeSerializer> safeValueSerializer) throws Exception {
+   return new byte[0];
--- End diff --

Maybe we could use a static field to avoid to new the `byte[0]` every time.


---


[GitHub] flink pull request #6156: [FLINK-9572] Extend InternalAppendingState with in...

2018-06-12 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6156#discussion_r194790994
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
 ---
@@ -87,40 +83,14 @@ public RocksDBFoldingState(ColumnFamilyHandle 
columnFamily,
}
 
@Override
-   public ACC get() {
-   try {
-   writeCurrentKeyWithGroupAndNamespace();
-   byte[] key = keySerializationStream.toByteArray();
-   byte[] valueBytes = backend.db.get(columnFamily, key);
-   if (valueBytes == null) {
-   return null;
-   }
-   return valueSerializer.deserialize(new 
DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(valueBytes)));
-   } catch (IOException | RocksDBException e) {
-   throw new RuntimeException("Error while retrieving data 
from RocksDB", e);
-   }
+   public ACC get() throws IOException {
+   return getInternal();
}
 
@Override
-   public void add(T value) throws IOException {
-   try {
-   writeCurrentKeyWithGroupAndNamespace();
-   byte[] key = keySerializationStream.toByteArray();
-   byte[] valueBytes = backend.db.get(columnFamily, key);
-   DataOutputViewStreamWrapper out = new 
DataOutputViewStreamWrapper(keySerializationStream);
-   if (valueBytes == null) {
-   keySerializationStream.reset();
-   
valueSerializer.serialize(foldFunction.fold(getDefaultValue(), value), out);
-   backend.db.put(columnFamily, writeOptions, key, 
keySerializationStream.toByteArray());
-   } else {
-   ACC oldValue = valueSerializer.deserialize(new 
DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(valueBytes)));
-   ACC newValue = foldFunction.fold(oldValue, 
value);
-   keySerializationStream.reset();
-   valueSerializer.serialize(newValue, out);
-   backend.db.put(columnFamily, writeOptions, key, 
keySerializationStream.toByteArray());
-   }
-   } catch (Exception e) {
-   throw new RuntimeException("Error while adding data to 
RocksDB", e);
-   }
+   public void add(T value) throws Exception {
+   ACC accumulator = getInternal();
+   accumulator = accumulator == null ? getDefaultValue() : 
foldFunction.fold(accumulator, value);
--- End diff --

This seems not consistency with the previous version. Should this be
```java
accumulator = foldFunction.fold(accumulator == null ? getDefaultValue() : 
accumulator, value);
```


---


[GitHub] flink pull request #6156: [FLINK-9572] Extend InternalAppendingState with in...

2018-06-12 Thread azagrebin
GitHub user azagrebin opened a pull request:

https://github.com/apache/flink/pull/6156

[FLINK-9572] Extend InternalAppendingState with internal stored state access

## What is the purpose of the change

Extend InternalAppendingState with get and update methods for internal 
stored state. 
Implement them in concrete states in backends.

## Brief change log

  - *InternalAppendingState* has now methods *getInternal* and 
*updateInternal*
  - Heap and Rocksdb merging states implement the methods


## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.
It should be covered by existing tests.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (yes)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/azagrebin/flink FLINK-9572

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6156.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6156


commit f235b327d7db370b6e4457ecc863cbf2626b4488
Author: Andrey Zagrebin 
Date:   2018-06-12T15:24:20Z

[FLINK-9572] Extend InternalAppendingState with internal stored state access




---