[GitHub] flink pull request #5885: [FLINK-8715] Remove usage of StateDescriptor in st...

2018-04-28 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5885


---


[GitHub] flink pull request #5885: [FLINK-8715] Remove usage of StateDescriptor in st...

2018-04-28 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5885#discussion_r184846476
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
 ---
@@ -190,4 +197,12 @@ protected void writeKeyWithGroupAndNamespace(
RocksDBKeySerializationUtils.writeKey(key, keySerializer, 
keySerializationStream, keySerializationDataOutputView, ambiguousKeyPossible);
RocksDBKeySerializationUtils.writeNameSpace(namespace, 
namespaceSerializer, keySerializationStream, keySerializationDataOutputView, 
ambiguousKeyPossible);
}
+
+   protected V getDefaultValue() {
--- End diff --

Since this PR is blocking a bug for the 1.5 release, I'll proceed to merge 
this as it is.
@bowenli86 Perhaps we can open a separate JIRA for this to keep this in 
mind?


---


[GitHub] flink pull request #5885: [FLINK-8715] Remove usage of StateDescriptor in st...

2018-04-27 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5885#discussion_r184841123
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
 ---
@@ -190,4 +197,12 @@ protected void writeKeyWithGroupAndNamespace(
RocksDBKeySerializationUtils.writeKey(key, keySerializer, 
keySerializationStream, keySerializationDataOutputView, ambiguousKeyPossible);
RocksDBKeySerializationUtils.writeNameSpace(namespace, 
namespaceSerializer, keySerializationStream, keySerializationDataOutputView, 
ambiguousKeyPossible);
}
+
+   protected V getDefaultValue() {
--- End diff --

Not too sure about this one.

That would require introducing 2 methods in the `InternalKvState`:
1. A getter method that returns the default value.
2. A default method that actually does the serialization copying of the 
default value (the current `getDefaultValue` method).


---


[GitHub] flink pull request #5885: [FLINK-8715] Remove usage of StateDescriptor in st...

2018-04-27 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5885#discussion_r184840215
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
 ---
@@ -455,4 +455,5 @@ public StreamCompressionDecorator 
getKeyGroupCompressionDecorator() {
@VisibleForTesting
public abstract int numStateEntries();
 
+
--- End diff --

Will revert 👍 


---


[GitHub] flink pull request #5885: [FLINK-8715] Remove usage of StateDescriptor in st...

2018-04-27 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5885#discussion_r184840230
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -1116,148 +1115,177 @@ private void 
restoreKeyGroupsShardWithTemporaryHelperInstance(
// 

 
/**
-* Creates a column family handle for use with a k/v state. When 
restoring from a snapshot
-* we don't restore the individual k/v states, just the global RocksDB 
database and the
-* list of column families. When a k/v state is first requested we 
check here whether we
-* already have a column family for that and return it or create a new 
one if it doesn't exist.
+* Registers a k/v state information, which includes its state id, 
type, RocksDB column family handle, and serializers.
 *
-* This also checks whether the {@link StateDescriptor} for a state 
matches the one
-* that we checkpointed, i.e. is already in the map of column families.
+* When restoring from a snapshot, we don’t restore the individual 
k/v states, just the global RocksDB database and
+* the list of k/v state information. When a k/v state is first 
requested we check here whether we
+* already have a registered entry for that and return it (after some 
necessary state compatibility checks)
+* or create a new one if it does not exist.
 */
-   @SuppressWarnings("rawtypes, unchecked")
-   protected  ColumnFamilyHandle getColumnFamily(
-   StateDescriptor descriptor, TypeSerializer 
namespaceSerializer) throws IOException, StateMigrationException {
+   private Tuple2 tryRegisterKvStateInformation(
--- End diff --

Will change this as suggested!


---


[GitHub] flink pull request #5885: [FLINK-8715] Remove usage of StateDescriptor in st...

2018-04-27 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5885#discussion_r184830451
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
 ---
@@ -455,4 +455,5 @@ public StreamCompressionDecorator 
getKeyGroupCompressionDecorator() {
@VisibleForTesting
public abstract int numStateEntries();
 
+
--- End diff --

revert this?


---


[GitHub] flink pull request #5885: [FLINK-8715] Remove usage of StateDescriptor in st...

2018-04-27 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5885#discussion_r184831125
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
 ---
@@ -190,4 +197,12 @@ protected void writeKeyWithGroupAndNamespace(
RocksDBKeySerializationUtils.writeKey(key, keySerializer, 
keySerializationStream, keySerializationDataOutputView, ambiguousKeyPossible);
RocksDBKeySerializationUtils.writeNameSpace(namespace, 
namespaceSerializer, keySerializationStream, keySerializationDataOutputView, 
ambiguousKeyPossible);
}
+
+   protected V getDefaultValue() {
--- End diff --

this method is duplicated among some impl classes. We can move it to 
`InternalKvState` as a [default 
method](https://docs.oracle.com/javase/tutorial/java/IandI/defaultmethods.html).


---


[GitHub] flink pull request #5885: [FLINK-8715] Remove usage of StateDescriptor in st...

2018-04-27 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5885#discussion_r184696134
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -1116,148 +1115,177 @@ private void 
restoreKeyGroupsShardWithTemporaryHelperInstance(
// 

 
/**
-* Creates a column family handle for use with a k/v state. When 
restoring from a snapshot
-* we don't restore the individual k/v states, just the global RocksDB 
database and the
-* list of column families. When a k/v state is first requested we 
check here whether we
-* already have a column family for that and return it or create a new 
one if it doesn't exist.
+* Registers a k/v state information, which includes its state id, 
type, RocksDB column family handle, and serializers.
 *
-* This also checks whether the {@link StateDescriptor} for a state 
matches the one
-* that we checkpointed, i.e. is already in the map of column families.
+* When restoring from a snapshot, we don’t restore the individual 
k/v states, just the global RocksDB database and
+* the list of k/v state information. When a k/v state is first 
requested we check here whether we
+* already have a registered entry for that and return it (after some 
necessary state compatibility checks)
+* or create a new one if it does not exist.
 */
-   @SuppressWarnings("rawtypes, unchecked")
-   protected  ColumnFamilyHandle getColumnFamily(
-   StateDescriptor descriptor, TypeSerializer 
namespaceSerializer) throws IOException, StateMigrationException {
+   private Tuple2 tryRegisterKvStateInformation(
--- End diff --

This method is rarely invoked, the tuples are not immutable so giving a 
defensive copy can also have its benefits an I feel like this is cleaner than 
having casts all over the place. There might also be different ways to solve 
the general problem, but this feels better to me than the initial version.


---


[GitHub] flink pull request #5885: [FLINK-8715] Remove usage of StateDescriptor in st...

2018-04-27 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5885#discussion_r184694977
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -1116,148 +1115,177 @@ private void 
restoreKeyGroupsShardWithTemporaryHelperInstance(
// 

 
/**
-* Creates a column family handle for use with a k/v state. When 
restoring from a snapshot
-* we don't restore the individual k/v states, just the global RocksDB 
database and the
-* list of column families. When a k/v state is first requested we 
check here whether we
-* already have a column family for that and return it or create a new 
one if it doesn't exist.
+* Registers a k/v state information, which includes its state id, 
type, RocksDB column family handle, and serializers.
 *
-* This also checks whether the {@link StateDescriptor} for a state 
matches the one
-* that we checkpointed, i.e. is already in the map of column families.
+* When restoring from a snapshot, we don’t restore the individual 
k/v states, just the global RocksDB database and
+* the list of k/v state information. When a k/v state is first 
requested we check here whether we
+* already have a registered entry for that and return it (after some 
necessary state compatibility checks)
+* or create a new one if it does not exist.
 */
-   @SuppressWarnings("rawtypes, unchecked")
-   protected  ColumnFamilyHandle getColumnFamily(
-   StateDescriptor descriptor, TypeSerializer 
namespaceSerializer) throws IOException, StateMigrationException {
+   private Tuple2 tryRegisterKvStateInformation(
--- End diff --

I did not go with this approach because of the extra tuples introduced.
Though, TBH, I wasn't sure which was the better approach, this or the one 
you pointed out.

I would not be against this version.


---


[GitHub] flink pull request #5885: [FLINK-8715] Remove usage of StateDescriptor in st...

2018-04-27 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5885#discussion_r184691691
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -1116,148 +1115,177 @@ private void 
restoreKeyGroupsShardWithTemporaryHelperInstance(
// 

 
/**
-* Creates a column family handle for use with a k/v state. When 
restoring from a snapshot
-* we don't restore the individual k/v states, just the global RocksDB 
database and the
-* list of column families. When a k/v state is first requested we 
check here whether we
-* already have a column family for that and return it or create a new 
one if it doesn't exist.
+* Registers a k/v state information, which includes its state id, 
type, RocksDB column family handle, and serializers.
 *
-* This also checks whether the {@link StateDescriptor} for a state 
matches the one
-* that we checkpointed, i.e. is already in the map of column families.
+* When restoring from a snapshot, we don’t restore the individual 
k/v states, just the global RocksDB database and
+* the list of k/v state information. When a k/v state is first 
requested we check here whether we
+* already have a registered entry for that and return it (after some 
necessary state compatibility checks)
+* or create a new one if it does not exist.
 */
-   @SuppressWarnings("rawtypes, unchecked")
-   protected  ColumnFamilyHandle getColumnFamily(
-   StateDescriptor descriptor, TypeSerializer 
namespaceSerializer) throws IOException, StateMigrationException {
+   private Tuple2 tryRegisterKvStateInformation(
--- End diff --

And we can remove also the lines that suppress warnings


---


[GitHub] flink pull request #5885: [FLINK-8715] Remove usage of StateDescriptor in st...

2018-04-27 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5885#discussion_r184690567
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -1116,148 +1115,177 @@ private void 
restoreKeyGroupsShardWithTemporaryHelperInstance(
// 

 
/**
-* Creates a column family handle for use with a k/v state. When 
restoring from a snapshot
-* we don't restore the individual k/v states, just the global RocksDB 
database and the
-* list of column families. When a k/v state is first requested we 
check here whether we
-* already have a column family for that and return it or create a new 
one if it doesn't exist.
+* Registers a k/v state information, which includes its state id, 
type, RocksDB column family handle, and serializers.
 *
-* This also checks whether the {@link StateDescriptor} for a state 
matches the one
-* that we checkpointed, i.e. is already in the map of column families.
+* When restoring from a snapshot, we don’t restore the individual 
k/v states, just the global RocksDB database and
+* the list of k/v state information. When a k/v state is first 
requested we check here whether we
+* already have a registered entry for that and return it (after some 
necessary state compatibility checks)
+* or create a new one if it does not exist.
 */
-   @SuppressWarnings("rawtypes, unchecked")
-   protected  ColumnFamilyHandle getColumnFamily(
-   StateDescriptor descriptor, TypeSerializer 
namespaceSerializer) throws IOException, StateMigrationException {
+   private Tuple2 tryRegisterKvStateInformation(
--- End diff --

We could rewrite this as
```
private  Tuple2> tryRegisterKvStateInformation(
StateDescriptor stateDesc,
TypeSerializer namespaceSerializer) throws 
StateMigrationException, IOException {

Tuple2 registeredInfo =
this.kvStateInformation.get(stateDesc.getName());

if (registeredInfo != null) {

@SuppressWarnings("unchecked")
RegisteredKeyedBackendStateMetaInfo.Snapshot 
restoredMetaInfoSnapshot =

restoredKvStateMetaInfos.get(stateDesc.getName());

Preconditions.checkState(
restoredMetaInfoSnapshot != null,
"Requested to check compatibility of a restored 
RegisteredKeyedBackendStateMetaInfo," +
" but its corresponding restored 
snapshot cannot be found.");

RegisteredKeyedBackendStateMetaInfo 
resolveKvStateCompatibility =

RegisteredKeyedBackendStateMetaInfo.resolveKvStateCompatibility(
restoredMetaInfoSnapshot,
namespaceSerializer,
stateDesc);

registeredInfo.f1 = resolveKvStateCompatibility;

return Tuple2.of(registeredInfo.f0, 
resolveKvStateCompatibility);
} else {
String stateName = stateDesc.getName();
RegisteredKeyedBackendStateMetaInfo newMetaInfo = 
new RegisteredKeyedBackendStateMetaInfo<>(
stateDesc.getType(),
stateName,
namespaceSerializer,
stateDesc.getSerializer());

ColumnFamilyHandle columnFamily = 
createColumnFamily(stateName);
registeredInfo = Tuple2.of(columnFamily, newMetaInfo);
this.kvStateInformation.put(stateDesc.getName(), 
registeredInfo);
return Tuple2.of(columnFamily, newMetaInfo);
}
}
```

and get rid of all the individual casts.


---


[GitHub] flink pull request #5885: [FLINK-8715] Remove usage of StateDescriptor in st...

2018-04-27 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5885#discussion_r184675164
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -1116,148 +1115,177 @@ private void 
restoreKeyGroupsShardWithTemporaryHelperInstance(
// 

 
/**
-* Creates a column family handle for use with a k/v state. When 
restoring from a snapshot
-* we don't restore the individual k/v states, just the global RocksDB 
database and the
-* list of column families. When a k/v state is first requested we 
check here whether we
-* already have a column family for that and return it or create a new 
one if it doesn't exist.
+* Registers a k/v state information, which includes its state id, 
type, RocksDB column family handle, and serializers.
 *
-* This also checks whether the {@link StateDescriptor} for a state 
matches the one
-* that we checkpointed, i.e. is already in the map of column families.
+* When restoring from a snapshot, we don’t restore the individual 
k/v states, just the global RocksDB database and
+* the list of k/v state information. When a k/v state is first 
requested we check here whether we
+* already have a registered entry for that and return it (after some 
necessary state compatibility checks)
+* or create a new one if it does not exist.
 */
-   @SuppressWarnings("rawtypes, unchecked")
-   protected  ColumnFamilyHandle getColumnFamily(
-   StateDescriptor descriptor, TypeSerializer 
namespaceSerializer) throws IOException, StateMigrationException {
+   private Tuple2 tryRegisterKvStateInformation(
+   StateDescriptor stateDesc,
+   TypeSerializer namespaceSerializer) throws 
StateMigrationException, IOException {
 
Tuple2 stateInfo =
-   kvStateInformation.get(descriptor.getName());
-
-   RegisteredKeyedBackendStateMetaInfo newMetaInfo = new 
RegisteredKeyedBackendStateMetaInfo<>(
-   descriptor.getType(),
-   descriptor.getName(),
-   namespaceSerializer,
-   descriptor.getSerializer());
+   kvStateInformation.get(stateDesc.getName());
 
+   RegisteredKeyedBackendStateMetaInfo newMetaInfo;
if (stateInfo != null) {
-   // TODO with eager registration in place, these checks 
should be moved to restore()
 
-   RegisteredKeyedBackendStateMetaInfo.Snapshot 
restoredMetaInfo =
-   
(RegisteredKeyedBackendStateMetaInfo.Snapshot) 
restoredKvStateMetaInfos.get(descriptor.getName());
+   @SuppressWarnings("unchecked")
+   RegisteredKeyedBackendStateMetaInfo.Snapshot 
restoredMetaInfoSnapshot =
+   
restoredKvStateMetaInfos.get(stateDesc.getName());
 
Preconditions.checkState(
-   Objects.equals(newMetaInfo.getName(), 
restoredMetaInfo.getName()),
-   "Incompatible state names. " +
-   "Was [" + restoredMetaInfo.getName() + 
"], " +
-   "registered with [" + 
newMetaInfo.getName() + "].");
-
-   if (!Objects.equals(newMetaInfo.getStateType(), 
StateDescriptor.Type.UNKNOWN)
-   && 
!Objects.equals(restoredMetaInfo.getStateType(), StateDescriptor.Type.UNKNOWN)) 
{
-
-   Preconditions.checkState(
-   newMetaInfo.getStateType() == 
restoredMetaInfo.getStateType(),
-   "Incompatible state types. " +
-   "Was [" + 
restoredMetaInfo.getStateType() + "], " +
-   "registered with [" + 
newMetaInfo.getStateType() + "].");
-   }
+   restoredMetaInfoSnapshot != null,
+   "Requested to check compatibility of a restored 
RegisteredKeyedBackendStateMetaInfo," +
+   " but its corresponding restored 
snapshot cannot be found.");
 
-   // check compatibility results to determine if state 
migration is required
-   CompatibilityResult namespaceCompatibility = 
CompatibilityUtil.resolveCompatibilityResult(
-  

[GitHub] flink pull request #5885: [FLINK-8715] Remove usage of StateDescriptor in st...

2018-04-27 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5885#discussion_r184657676
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
 ---
@@ -61,8 +64,7 @@
/** The column family of this particular instance of state. */
protected ColumnFamilyHandle columnFamily;
 
-   /** State descriptor from which to create this state instance. */
-   protected final SD stateDesc;
--- End diff --

Fixed, thanks for catching this.


---


[GitHub] flink pull request #5885: [FLINK-8715] Remove usage of StateDescriptor in st...

2018-04-27 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5885#discussion_r184657584
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
 ---
@@ -185,36 +188,44 @@ public HeapKeyedStateBackend(

stateName.equals(stateTable.getMetaInfo().getName()),
"Incompatible state names. " +
"Was [" + 
stateTable.getMetaInfo().getName() + "], " +
-   "registered with [" + 
newMetaInfo.getName() + "].");
+   "registered with [" + stateName + "].");
 
-   if 
(!newMetaInfo.getStateType().equals(StateDescriptor.Type.UNKNOWN)
+   if (!stateType.equals(StateDescriptor.Type.UNKNOWN)
&& 
!stateTable.getMetaInfo().getStateType().equals(StateDescriptor.Type.UNKNOWN)) {
 
Preconditions.checkState(
-   
newMetaInfo.getStateType().equals(stateTable.getMetaInfo().getStateType()),
+   
stateType.equals(stateTable.getMetaInfo().getStateType()),
"Incompatible state types. " +
"Was [" + 
stateTable.getMetaInfo().getStateType() + "], " +
-   "registered with [" + 
newMetaInfo.getStateType() + "].");
+   "registered with [" + stateType 
+ "].");
}
 
@SuppressWarnings("unchecked")
RegisteredKeyedBackendStateMetaInfo.Snapshot 
restoredMetaInfo =

(RegisteredKeyedBackendStateMetaInfo.Snapshot) 
restoredKvStateMetaInfos.get(stateName);
 
// check compatibility results to determine if state 
migration is required
+   TypeSerializer newNamespaceSerializer = 
namespaceSerializer.duplicate();
CompatibilityResult namespaceCompatibility = 
CompatibilityUtil.resolveCompatibilityResult(

restoredMetaInfo.getNamespaceSerializer(),
null,

restoredMetaInfo.getNamespaceSerializerConfigSnapshot(),
-   newMetaInfo.getNamespaceSerializer());
+   newNamespaceSerializer);
 
+   TypeSerializer newValueSerializer = 
valueSerializer.duplicate();
--- End diff --

I've updated this according to your suggestion:
The reconfiguration method now gets a state descriptor as an argument to 
reduce confusion.


---


[GitHub] flink pull request #5885: [FLINK-8715] Remove usage of StateDescriptor in st...

2018-04-27 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5885#discussion_r184657398
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java
 ---
@@ -49,17 +49,18 @@
/**
 * Creates a new key/value state for the given hash map of key/value 
pairs.
 *
-* @param stateDesc The state identifier for the state. This contains 
name
-*   and can create a default state value.
+* @param valueSerializer The serializer for the state.
 * @param stateTable The state tab;e to use in this kev/value state. 
May contain initial state.
 */
public HeapFoldingState(
-   FoldingStateDescriptor stateDesc,
StateTable stateTable,
TypeSerializer keySerializer,
-   TypeSerializer namespaceSerializer) {
-   super(stateDesc, stateTable, keySerializer, 
namespaceSerializer);
-   this.foldTransformation = new FoldTransformation<>(stateDesc);
+   TypeSerializer valueSerializer,
+   TypeSerializer namespaceSerializer,
+   ACC defaultValue,
--- End diff --

Javadocs for all state classes are now updated.


---


[GitHub] flink pull request #5885: [FLINK-8715] Remove usage of StateDescriptor in st...

2018-04-27 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5885#discussion_r184657419
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java
 ---
@@ -103,17 +104,17 @@ public void add(T value) throws IOException {
 
private static final class FoldTransformation implements 
StateTransformationFunction {
 
-   private final FoldingStateDescriptor stateDescriptor;
+   private final HeapFoldingState stateRef;
private final FoldFunction foldFunction;
 
-   FoldTransformation(FoldingStateDescriptor stateDesc) {
-   this.stateDescriptor = 
Preconditions.checkNotNull(stateDesc);
-   this.foldFunction = 
Preconditions.checkNotNull(stateDesc.getFoldFunction());
+   FoldTransformation(FoldFunction foldFunction, 
HeapFoldingState stateRef) {
+   this.stateRef = Preconditions.checkNotNull(stateRef);
--- End diff --

Done 👌 


---


[GitHub] flink pull request #5885: [FLINK-8715] Remove usage of StateDescriptor in st...

2018-04-25 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5885#discussion_r184104126
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
 ---
@@ -185,36 +188,44 @@ public HeapKeyedStateBackend(

stateName.equals(stateTable.getMetaInfo().getName()),
"Incompatible state names. " +
"Was [" + 
stateTable.getMetaInfo().getName() + "], " +
-   "registered with [" + 
newMetaInfo.getName() + "].");
+   "registered with [" + stateName + "].");
 
-   if 
(!newMetaInfo.getStateType().equals(StateDescriptor.Type.UNKNOWN)
+   if (!stateType.equals(StateDescriptor.Type.UNKNOWN)
&& 
!stateTable.getMetaInfo().getStateType().equals(StateDescriptor.Type.UNKNOWN)) {
 
Preconditions.checkState(
-   
newMetaInfo.getStateType().equals(stateTable.getMetaInfo().getStateType()),
+   
stateType.equals(stateTable.getMetaInfo().getStateType()),
"Incompatible state types. " +
"Was [" + 
stateTable.getMetaInfo().getStateType() + "], " +
-   "registered with [" + 
newMetaInfo.getStateType() + "].");
+   "registered with [" + stateType 
+ "].");
}
 
@SuppressWarnings("unchecked")
RegisteredKeyedBackendStateMetaInfo.Snapshot 
restoredMetaInfo =

(RegisteredKeyedBackendStateMetaInfo.Snapshot) 
restoredKvStateMetaInfos.get(stateName);
 
// check compatibility results to determine if state 
migration is required
+   TypeSerializer newNamespaceSerializer = 
namespaceSerializer.duplicate();
CompatibilityResult namespaceCompatibility = 
CompatibilityUtil.resolveCompatibilityResult(

restoredMetaInfo.getNamespaceSerializer(),
null,

restoredMetaInfo.getNamespaceSerializerConfigSnapshot(),
-   newMetaInfo.getNamespaceSerializer());
+   newNamespaceSerializer);
 
+   TypeSerializer newValueSerializer = 
valueSerializer.duplicate();
--- End diff --

Similar to my comment on the Rocks code, this duplicate seems redundant, 
because the serializer also comes straight from a `StateDescriptor` which 
duplicates before handing it out. One thing to consider when removing this 
call: it is just less obvious here that the serializer was already duplicated, 
so maybe it would be good to pass the state descriptor as argument and get the 
serializer directly here to avoid any surprises for people working on this in 
the future.


---


[GitHub] flink pull request #5885: [FLINK-8715] Remove usage of StateDescriptor in st...

2018-04-25 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5885#discussion_r184101616
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -1125,59 +1125,62 @@ private void 
restoreKeyGroupsShardWithTemporaryHelperInstance(
 * that we checkpointed, i.e. is already in the map of column families.
 */
@SuppressWarnings("rawtypes, unchecked")
-   protected  ColumnFamilyHandle getColumnFamily(
+   protected  Tuple2> getColumnFamilyAndStateSerializer(
StateDescriptor descriptor, TypeSerializer 
namespaceSerializer) throws IOException, StateMigrationException {
 
Tuple2 stateInfo =
kvStateInformation.get(descriptor.getName());
 
-   RegisteredKeyedBackendStateMetaInfo newMetaInfo = new 
RegisteredKeyedBackendStateMetaInfo<>(
-   descriptor.getType(),
-   descriptor.getName(),
-   namespaceSerializer,
-   descriptor.getSerializer());
-
if (stateInfo != null) {
// TODO with eager registration in place, these checks 
should be moved to restore()
 
RegisteredKeyedBackendStateMetaInfo.Snapshot 
restoredMetaInfo =

(RegisteredKeyedBackendStateMetaInfo.Snapshot) 
restoredKvStateMetaInfos.get(descriptor.getName());
 
Preconditions.checkState(
-   Objects.equals(newMetaInfo.getName(), 
restoredMetaInfo.getName()),
+   Objects.equals(descriptor.getName(), 
restoredMetaInfo.getName()),
"Incompatible state names. " +
"Was [" + restoredMetaInfo.getName() + 
"], " +
-   "registered with [" + 
newMetaInfo.getName() + "].");
+   "registered with [" + 
descriptor.getName() + "].");
 
-   if (!Objects.equals(newMetaInfo.getStateType(), 
StateDescriptor.Type.UNKNOWN)
+   if (!Objects.equals(descriptor.getType(), 
StateDescriptor.Type.UNKNOWN)
&& 
!Objects.equals(restoredMetaInfo.getStateType(), StateDescriptor.Type.UNKNOWN)) 
{
 
Preconditions.checkState(
-   newMetaInfo.getStateType() == 
restoredMetaInfo.getStateType(),
+   descriptor.getType() == 
restoredMetaInfo.getStateType(),
"Incompatible state types. " +
"Was [" + 
restoredMetaInfo.getStateType() + "], " +
-   "registered with [" + 
newMetaInfo.getStateType() + "].");
+   "registered with [" + 
descriptor.getType() + "].");
}
 
// check compatibility results to determine if state 
migration is required
+   TypeSerializer newNamespaceSerializer = 
namespaceSerializer.duplicate();
CompatibilityResult namespaceCompatibility = 
CompatibilityUtil.resolveCompatibilityResult(
restoredMetaInfo.getNamespaceSerializer(),
null,

restoredMetaInfo.getNamespaceSerializerConfigSnapshot(),
-   newMetaInfo.getNamespaceSerializer());
+   newNamespaceSerializer);
 
+   TypeSerializer newStateSerializer = 
descriptor.getSerializer().duplicate();
--- End diff --

The `duplicate()` here looks redundant because it comes from the descriptor 
that already duplicates.


---


[GitHub] flink pull request #5885: [FLINK-8715] Remove usage of StateDescriptor in st...

2018-04-25 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5885#discussion_r184098081
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
 ---
@@ -185,36 +188,44 @@ public HeapKeyedStateBackend(

stateName.equals(stateTable.getMetaInfo().getName()),
"Incompatible state names. " +
"Was [" + 
stateTable.getMetaInfo().getName() + "], " +
-   "registered with [" + 
newMetaInfo.getName() + "].");
+   "registered with [" + stateName + "].");
 
-   if 
(!newMetaInfo.getStateType().equals(StateDescriptor.Type.UNKNOWN)
+   if (!stateType.equals(StateDescriptor.Type.UNKNOWN)
&& 
!stateTable.getMetaInfo().getStateType().equals(StateDescriptor.Type.UNKNOWN)) {
 
Preconditions.checkState(
-   
newMetaInfo.getStateType().equals(stateTable.getMetaInfo().getStateType()),
+   
stateType.equals(stateTable.getMetaInfo().getStateType()),
"Incompatible state types. " +
"Was [" + 
stateTable.getMetaInfo().getStateType() + "], " +
-   "registered with [" + 
newMetaInfo.getStateType() + "].");
+   "registered with [" + stateType 
+ "].");
}
 
@SuppressWarnings("unchecked")
RegisteredKeyedBackendStateMetaInfo.Snapshot 
restoredMetaInfo =

(RegisteredKeyedBackendStateMetaInfo.Snapshot) 
restoredKvStateMetaInfos.get(stateName);
 
// check compatibility results to determine if state 
migration is required
+   TypeSerializer newNamespaceSerializer = 
namespaceSerializer.duplicate();
--- End diff --

Just curious, why do we need to duplicate the serializer here but not in 
all other places like where `resolveCompatibilityResult()` is called? Or asked 
differently, should `resolveCompatibilityResult()` always do duplication 
internally or not at all or is this just as intended?


---


[GitHub] flink pull request #5885: [FLINK-8715] Remove usage of StateDescriptor in st...

2018-04-25 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5885#discussion_r184093390
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java
 ---
@@ -103,17 +104,17 @@ public void add(T value) throws IOException {
 
private static final class FoldTransformation implements 
StateTransformationFunction {
 
-   private final FoldingStateDescriptor stateDescriptor;
+   private final HeapFoldingState stateRef;
private final FoldFunction foldFunction;
 
-   FoldTransformation(FoldingStateDescriptor stateDesc) {
-   this.stateDescriptor = 
Preconditions.checkNotNull(stateDesc);
-   this.foldFunction = 
Preconditions.checkNotNull(stateDesc.getFoldFunction());
+   FoldTransformation(FoldFunction foldFunction, 
HeapFoldingState stateRef) {
+   this.stateRef = Preconditions.checkNotNull(stateRef);
--- End diff --

Maybe the more honest and simple way is making this a non-static inner 
class instead of passing a reference to `HeapFoldingState.this` in the 
constructor. Essentially it does the same.


---


[GitHub] flink pull request #5885: [FLINK-8715] Remove usage of StateDescriptor in st...

2018-04-25 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5885#discussion_r184087656
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java
 ---
@@ -49,17 +49,18 @@
/**
 * Creates a new key/value state for the given hash map of key/value 
pairs.
 *
-* @param stateDesc The state identifier for the state. This contains 
name
-*   and can create a default state value.
+* @param valueSerializer The serializer for the state.
 * @param stateTable The state tab;e to use in this kev/value state. 
May contain initial state.
 */
public HeapFoldingState(
-   FoldingStateDescriptor stateDesc,
StateTable stateTable,
TypeSerializer keySerializer,
-   TypeSerializer namespaceSerializer) {
-   super(stateDesc, stateTable, keySerializer, 
namespaceSerializer);
-   this.foldTransformation = new FoldTransformation<>(stateDesc);
+   TypeSerializer valueSerializer,
+   TypeSerializer namespaceSerializer,
+   ACC defaultValue,
--- End diff --

I think you need to double check this on every state class, they look all 
not updated.


---


[GitHub] flink pull request #5885: [FLINK-8715] Remove usage of StateDescriptor in st...

2018-04-25 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5885#discussion_r184087107
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java
 ---
@@ -42,33 +42,35 @@
/** Map containing the actual key/value pairs. */
protected final StateTable stateTable;
 
-   /** This holds the name of the state and can create an initial default 
value for the state. */
-   protected final SD stateDesc;
-
/** The current namespace, which the access methods will refer to. */
protected N currentNamespace;
 
protected final TypeSerializer keySerializer;
 
+   protected final TypeSerializer valueSerializer;
+
protected final TypeSerializer namespaceSerializer;
 
+   private final SV defaultValue;
+
/**
 * Creates a new key/value state for the given hash map of key/value 
pairs.
 *
-* @param stateDesc The state identifier for the state. This contains 
name
-*   and can create a default state value.
+* @param valueSerializer The serializer for the state.
 * @param stateTable The state tab;e to use in this kev/value state. 
May contain initial state.
 */
protected AbstractHeapState(
-   SD stateDesc,
StateTable stateTable,
TypeSerializer keySerializer,
-   TypeSerializer namespaceSerializer) {
+   TypeSerializer valueSerializer,
+   TypeSerializer namespaceSerializer,
--- End diff --

Comments require update.


---


[GitHub] flink pull request #5885: [FLINK-8715] Remove usage of StateDescriptor in st...

2018-04-25 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5885#discussion_r184087357
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java
 ---
@@ -49,17 +49,18 @@
/**
 * Creates a new key/value state for the given hash map of key/value 
pairs.
 *
-* @param stateDesc The state identifier for the state. This contains 
name
-*   and can create a default state value.
+* @param valueSerializer The serializer for the state.
 * @param stateTable The state tab;e to use in this kev/value state. 
May contain initial state.
 */
public HeapFoldingState(
-   FoldingStateDescriptor stateDesc,
StateTable stateTable,
TypeSerializer keySerializer,
-   TypeSerializer namespaceSerializer) {
-   super(stateDesc, stateTable, keySerializer, 
namespaceSerializer);
-   this.foldTransformation = new FoldTransformation<>(stateDesc);
+   TypeSerializer valueSerializer,
+   TypeSerializer namespaceSerializer,
+   ACC defaultValue,
--- End diff --

Comments require update.


---


[GitHub] flink pull request #5885: [FLINK-8715] Remove usage of StateDescriptor in st...

2018-04-25 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5885#discussion_r184086555
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapAggregatingState.java
 ---
@@ -47,21 +47,23 @@
/**
 * Creates a new key/value state for the given hash map of key/value 
pairs.
 *
-* @param stateDesc
-* The state identifier for the state. This contains name 
and can create a default state value.
+* @param valueSerializer
+* The serializer for the state.
 * @param stateTable
 * The state table to use in this kev/value state. May 
contain initial state.
 * @param namespaceSerializer
 * The serializer for the type that indicates the namespace
 */
public HeapAggregatingState(
-   AggregatingStateDescriptor stateDesc,
StateTable stateTable,
TypeSerializer keySerializer,
-   TypeSerializer namespaceSerializer) {
+   TypeSerializer valueSerializer,
+   TypeSerializer namespaceSerializer,
+   ACC defaultValue,
--- End diff --

The comments are not updated to reflect this and the next line.


---


[GitHub] flink pull request #5885: [FLINK-8715] Remove usage of StateDescriptor in st...

2018-04-25 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5885#discussion_r184085595
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
 ---
@@ -61,8 +64,7 @@
/** The column family of this particular instance of state. */
protected ColumnFamilyHandle columnFamily;
 
-   /** State descriptor from which to create this state instance. */
-   protected final SD stateDesc;
--- End diff --

Same as for the heap state also applies here, you can remove `SD` from the 
generic types of this class and the subclasses.


---


[GitHub] flink pull request #5885: [FLINK-8715] Remove usage of StateDescriptor in st...

2018-04-25 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5885#discussion_r184084995
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java
 ---
@@ -42,33 +42,35 @@
/** Map containing the actual key/value pairs. */
protected final StateTable stateTable;
 
-   /** This holds the name of the state and can create an initial default 
value for the state. */
-   protected final SD stateDesc;
--- End diff --

I think with removing this, you can also remove SD from the generic type of 
this class, and transitively from the subclasses as well.


---


[GitHub] flink pull request #5885: [FLINK-8715] Remove usage of StateDescriptor in st...

2018-04-25 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5885#discussion_r183997653
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
 ---
@@ -169,13 +169,16 @@ public HeapKeyedStateBackend(
TypeSerializer namespaceSerializer,
TypeSerializer valueSerializer) throws 
StateMigrationException {
 
-   final RegisteredKeyedBackendStateMetaInfo newMetaInfo =
-   new 
RegisteredKeyedBackendStateMetaInfo<>(stateType, stateName, 
namespaceSerializer, valueSerializer);
-
@SuppressWarnings("unchecked")
StateTable stateTable = (StateTable) 
stateTables.get(stateName);
 
if (stateTable == null) {
+   RegisteredKeyedBackendStateMetaInfo newMetaInfo = 
new RegisteredKeyedBackendStateMetaInfo<>(
--- End diff --

Mixing of concerns is not yet as bad as in the Rocks backend code, you 
might also start separating this a bit more here as well.


---


[GitHub] flink pull request #5885: [FLINK-8715] Remove usage of StateDescriptor in st...

2018-04-25 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5885#discussion_r183996477
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -1125,59 +1125,62 @@ private void 
restoreKeyGroupsShardWithTemporaryHelperInstance(
 * that we checkpointed, i.e. is already in the map of column families.
 */
@SuppressWarnings("rawtypes, unchecked")
-   protected  ColumnFamilyHandle getColumnFamily(
+   protected  Tuple2> getColumnFamilyAndStateSerializer(
--- End diff --

I think this method has grown way too complex over time, and looking at the 
`Tuple2` return type it becomes more and more clear that this code is mixing up 
2 different concerns and could be untangled a bit. I would suggest to separate 
this into: 
1) checking if this is a new state (does the map contain the name string), 
this is like a inlined check in current calling code. 
2) If yes, do the serializer checks and configuration magic and create the 
`RegisteredKeyedBackendStateMetaInfo`. this goes to a separate method that is 
called by the current caller.
3) Request the column family, either by new registration or the existing 
one. can use the result from step 1 or recheck. this goes in another separate 
method called by the current caller.
x) Optional: helper method that does steps 1-3 if we otherwise duplicate 
them too much.


---


[GitHub] flink pull request #5885: [FLINK-8715] Remove usage of StateDescriptor in st...

2018-04-23 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5885#discussion_r183309701
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -1177,7 +1177,7 @@ private void 
restoreKeyGroupsShardWithTemporaryHelperInstance(
throw new StateMigrationException("State 
migration isn't supported, yet.");
} else {
stateInfo.f1 = newMetaInfo;
-   return stateInfo.f0;
+   return Tuple2.of(stateInfo.f0, 
newMetaInfo.getStateSerializer());
--- End diff --

I agree here, that we should let the meta info be immutable, and let the 
compatibility check result carry the compatible, reconfigured serializer.

However, one issue is that this requires changes to the 
`CompatibilityResult` interface which is part of the public API. I would prefer 
not to touch the API now as we're approaching release. It would be possible to 
by-pass this by maybe introducing an internal compat result class, but 
downsides are - 1) that would have almost identical implementation to 
`CompatibilityResult`, and 2) that would entail touching a lot of our more 
complex serializer's code, because they use 
`CompatibilityUtil.resolveCompatibilityResult`.


---


[GitHub] flink pull request #5885: [FLINK-8715] Remove usage of StateDescriptor in st...

2018-04-21 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5885#discussion_r183214737
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -1177,7 +1177,7 @@ private void 
restoreKeyGroupsShardWithTemporaryHelperInstance(
throw new StateMigrationException("State 
migration isn't supported, yet.");
} else {
stateInfo.f1 = newMetaInfo;
-   return stateInfo.f0;
+   return Tuple2.of(stateInfo.f0, 
newMetaInfo.getStateSerializer());
--- End diff --

Mirroring a the result from an offline discussion here:

This is a bit fragile - the fact that the `newMetaInfo` is mutable and the 
serializer is altered in there and then obtained from there again. Makes it 
harder for future maintainers and easy to accidentally break in the future.

The meta info should be immutable, and the re-configured serializer (or the 
original, if no reconfiguration is needed) would probably best be part of the 
compatiblity result.


---


[GitHub] flink pull request #5885: [FLINK-8715] Remove usage of StateDescriptor in st...

2018-04-20 Thread tzulitai
GitHub user tzulitai opened a pull request:

https://github.com/apache/flink/pull/5885

[FLINK-8715] Remove usage of StateDescriptor in state handles

## What is the purpose of the change

This PR is WIP, and is still lacking test coverage.
It is opened now to collect some feedback for a proposed solution for 
FLINK-8715.

Previously, reconfigured state serializers on restore were not properly 
forwarded to the state handles. In the past, the `StateDescriptor` served as 
the holder for the reconfigured serializer.
However, since 88ffad27, `StateDescriptor#getSerializer()` started giving 
out duplicates of the serializer, which caused reconfigured serializers to be a 
completely different copy then what the state handles were using.

This fix corrects this by explicitly forwarding the serializer to the 
instantiated state handles after the state is registered at the state backend. 
It also eliminates the use of `StateDescriptor`s internally in the state 
handles, so that the behaviour is independent of the 
`StateDescriptor#getSerializer()` method's implementation.

The alternative to this approach is to have an internal `setSerializer` 
method on the `StateDescriptor`, which should be used after state serializers 
are reconfigured on registration.
Then, that assures that handed out serializers by the descriptor are always 
reconfigured, as soon as the descriptor is registered at the backend.

## Brief change log

- Remove `StateDescriptor`s from heap / RocksDB state handle classes
- Forwards state serializer and any other necessary information provided by 
the state descriptor (e.g. default value, user functions, nested serializers, 
etc.) when instantiating state handles.

## Verifying this change

This fix still lacks test coverage.
It has been opened to collect feedback for the approach.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / (**no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (**yes** / no / don't know)
  - The runtime per-record code paths (performance sensitive): (**yes** / 
no / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tzulitai/flink FLINK-8715

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5885.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5885


commit c092dd6518d9e6f47f4cfc797c18bedc8a89cc05
Author: Tzu-Li (Gordon) Tai 
Date:   2018-04-20T13:15:42Z

[FLINK-8715] Remove usage of StateDescriptor in state handles




---