[GitHub] flink pull request #3870: [Flink 6537] Fixes and improvements for incrementa...

2017-05-14 Thread StefanRRichter
Github user StefanRRichter closed the pull request at:

https://github.com/apache/flink/pull/3870


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3870: [Flink 6537] Fixes and improvements for incrementa...

2017-05-12 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/3870#discussion_r116179467
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
 ---
@@ -18,91 +18,137 @@
 
 package org.apache.flink.runtime.state;
 
+import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.Executor;
 
 /**
  * A {@code SharedStateRegistry} will be deployed in the 
- * {@link org.apache.flink.runtime.checkpoint.CheckpointCoordinator} to 
+ * {@link org.apache.flink.runtime.checkpoint.CompletedCheckpointStore} to
  * maintain the reference count of {@link SharedStateHandle}s which are 
shared
- * among different checkpoints.
- *
+ * among different incremental checkpoints.
  */
 public class SharedStateRegistry {
 
private static final Logger LOG = 
LoggerFactory.getLogger(SharedStateRegistry.class);
 
/** All registered state objects by an artificial key */
-   private final Map 
registeredStates;
+   private final Map registeredStates;
+
+   /** Executor for async state deletion */
+   private final Executor asyncDisposalExecutor;
 
public SharedStateRegistry() {
this.registeredStates = new HashMap<>();
+   this.asyncDisposalExecutor = Executors.directExecutor(); 
//TODO: FLINK-6534
--- End diff --

I totally agree that there should not be a new executor, that is why I 
marked it with the TODO. This is just a preparation for the full fix of 
FLINK-6534. My plan for the full fix is to pass the IO executor from the 
`CompletedCheckpointStore` and use it inside the registry. This will happen 
outside of any synchronization. For now, this code is a working placeholder for 
the full fix that I will do as a followup.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3870: [Flink 6537] Fixes and improvements for incrementa...

2017-05-12 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/3870#discussion_r116175297
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalKeyedStateHandle.java
 ---
@@ -180,69 +176,66 @@ public long getStateSize() {
 
@Override
public void registerSharedStates(SharedStateRegistry stateRegistry) {
+
Preconditions.checkState(!registered, "The state handle has 
already registered its shared states.");
 
-   for (Map.Entry newSstFileEntry : 
newSstFiles.entrySet()) {
-   SstFileStateHandle stateHandle = new 
SstFileStateHandle(newSstFileEntry.getKey(), newSstFileEntry.getValue());
+   for (Map.Entry newSstFileEntry : 
unregisteredSstFiles.entrySet()) {
--- End diff --

For FLINK-6545 we need to familiarize the savepoint serializer with the new 
incremental handles, but currently they are in the RocksDB package, invisible 
for savepoint classes. 

I am currently already working on making incremental snapshots a concept on 
a higher abstraction level, that is less tightly coupled to RocksDB. I think 
that we can then have incremental snapshots also for other backends, e.g. the 
memory based. The abstraction is simply, that all incremental snapshots can be 
divided into backend meta data, private data, newly created shared data and 
referenced shared data. Also the placeholder handle will become publicly 
available. Would this address comments? I could make those preparations already 
part of this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3870: [Flink 6537] Fixes and improvements for incrementa...

2017-05-12 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/3870#discussion_r116174101
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalKeyedStateHandle.java
 ---
@@ -180,69 +176,66 @@ public long getStateSize() {
 
@Override
public void registerSharedStates(SharedStateRegistry stateRegistry) {
+
Preconditions.checkState(!registered, "The state handle has 
already registered its shared states.");
 
-   for (Map.Entry newSstFileEntry : 
newSstFiles.entrySet()) {
-   SstFileStateHandle stateHandle = new 
SstFileStateHandle(newSstFileEntry.getKey(), newSstFileEntry.getValue());
+   for (Map.Entry newSstFileEntry : 
unregisteredSstFiles.entrySet()) {
+   SharedStateRegistryKey registryKey =
+   
createSharedStateRegistryKeyFromFileName(newSstFileEntry.getKey());
 
-   int referenceCount = 
stateRegistry.register(stateHandle);
-   Preconditions.checkState(referenceCount == 1);
+   SharedStateRegistry.Result result =
+   stateRegistry.registerNewReference(registryKey, 
newSstFileEntry.getValue());
+
+   // We update our reference with the result from the 
registry, to prevent the following
+   // problem:
+   // A previous checkpoint n has already registered the 
state. This can happen if a
+   // following checkpoint (n + x) wants to reference the 
same state before the backend got
+   // notified that checkpoint n completed. In this case, 
the shared registry did
+   // deduplication and returns the previous reference.
+   newSstFileEntry.setValue(result.getReference());
}
 
-   for (Map.Entry oldSstFileEntry : 
oldSstFiles.entrySet()) {
-   SstFileStateHandle stateHandle = new 
SstFileStateHandle(oldSstFileEntry.getKey(), oldSstFileEntry.getValue());
+   for (Map.Entry oldSstFileName : 
registeredSstFiles.entrySet()) {
+   SharedStateRegistryKey registryKey =
+   
createSharedStateRegistryKeyFromFileName(oldSstFileName.getKey());
+
+   SharedStateRegistry.Result result = 
stateRegistry.obtainReference(registryKey);
 
-   int referenceCount = 
stateRegistry.register(stateHandle);
-   Preconditions.checkState(referenceCount > 1);
+   // Again we update our state handle with the result 
from the registry, thus replacing
+   // placeholder state handles with the originals.
+   oldSstFileName.setValue(result.getReference());
}
 
+   // Migrate state from unregistered to registered, so that it 
will not count as private state
+   // for #discardState() from now.
+   registeredSstFiles.putAll(unregisteredSstFiles);
+   unregisteredSstFiles.clear();
+
registered = true;
}
 
@Override
public void unregisterSharedStates(SharedStateRegistry stateRegistry) {
+
Preconditions.checkState(registered, "The state handle has not 
registered its shared states yet.");
 
-   for (Map.Entry newSstFileEntry : 
newSstFiles.entrySet()) {
-   stateRegistry.unregister(new 
SstFileStateHandle(newSstFileEntry.getKey(), newSstFileEntry.getValue()));
+   for (Map.Entry newSstFileEntry : 
unregisteredSstFiles.entrySet()) {
--- End diff --

Yes, this is not required


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3870: [Flink 6537] Fixes and improvements for incrementa...

2017-05-12 Thread shixiaogang
Github user shixiaogang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3870#discussion_r116172093
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
 ---
@@ -18,91 +18,137 @@
 
 package org.apache.flink.runtime.state;
 
+import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.Executor;
 
 /**
  * A {@code SharedStateRegistry} will be deployed in the 
- * {@link org.apache.flink.runtime.checkpoint.CheckpointCoordinator} to 
+ * {@link org.apache.flink.runtime.checkpoint.CompletedCheckpointStore} to
  * maintain the reference count of {@link SharedStateHandle}s which are 
shared
- * among different checkpoints.
- *
+ * among different incremental checkpoints.
  */
 public class SharedStateRegistry {
 
private static final Logger LOG = 
LoggerFactory.getLogger(SharedStateRegistry.class);
 
/** All registered state objects by an artificial key */
-   private final Map 
registeredStates;
+   private final Map registeredStates;
+
+   /** Executor for async state deletion */
+   private final Executor asyncDisposalExecutor;
 
public SharedStateRegistry() {
this.registeredStates = new HashMap<>();
+   this.asyncDisposalExecutor = Executors.directExecutor(); 
//TODO: FLINK-6534
--- End diff --

I prefer not to use another asynchronous executor here.

In my initial implementation of `SharedStateRegistry`, unreferenced shared 
states are not discarded immediately and are returned in a list. These 
unreferenced shared states then are discarded outside the synchronization 
scope. Given that the completed checkpoints are already discarded in an 
asynchronous thread in the `ZookeeperCompletedCheckpointStore` (which are more 
used in practice), we can avoid the usage of another asynchronous executor 
here. 

What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3870: [Flink 6537] Fixes and improvements for incrementa...

2017-05-12 Thread shixiaogang
Github user shixiaogang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3870#discussion_r116161754
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -922,6 +940,39 @@ void releaseResources(boolean canceled) {
}
}
}
+
+   /**
+* A placeholder state handle for shared state that will 
replaced by an original that was
+* created in a previous checkpoint. So we don't have to send 
the handle twice, e.g. in
+* case of {@link ByteStreamStateHandle}.
+*/
+   private static final class PlaceholderStreamStateHandle 
implements StreamStateHandle {
--- End diff --

I think we can move `PlaceholderStreamStateHandle` out so that it can be 
used by other state backends. What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3870: [Flink 6537] Fixes and improvements for incrementa...

2017-05-12 Thread shixiaogang
Github user shixiaogang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3870#discussion_r116161318
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalKeyedStateHandle.java
 ---
@@ -180,69 +176,66 @@ public long getStateSize() {
 
@Override
public void registerSharedStates(SharedStateRegistry stateRegistry) {
+
Preconditions.checkState(!registered, "The state handle has 
already registered its shared states.");
 
-   for (Map.Entry newSstFileEntry : 
newSstFiles.entrySet()) {
-   SstFileStateHandle stateHandle = new 
SstFileStateHandle(newSstFileEntry.getKey(), newSstFileEntry.getValue());
+   for (Map.Entry newSstFileEntry : 
unregisteredSstFiles.entrySet()) {
+   SharedStateRegistryKey registryKey =
+   
createSharedStateRegistryKeyFromFileName(newSstFileEntry.getKey());
 
-   int referenceCount = 
stateRegistry.register(stateHandle);
-   Preconditions.checkState(referenceCount == 1);
+   SharedStateRegistry.Result result =
+   stateRegistry.registerNewReference(registryKey, 
newSstFileEntry.getValue());
+
+   // We update our reference with the result from the 
registry, to prevent the following
+   // problem:
+   // A previous checkpoint n has already registered the 
state. This can happen if a
+   // following checkpoint (n + x) wants to reference the 
same state before the backend got
+   // notified that checkpoint n completed. In this case, 
the shared registry did
+   // deduplication and returns the previous reference.
+   newSstFileEntry.setValue(result.getReference());
}
 
-   for (Map.Entry oldSstFileEntry : 
oldSstFiles.entrySet()) {
-   SstFileStateHandle stateHandle = new 
SstFileStateHandle(oldSstFileEntry.getKey(), oldSstFileEntry.getValue());
+   for (Map.Entry oldSstFileName : 
registeredSstFiles.entrySet()) {
--- End diff --

Similar to the previous comment, `oldSstFileName` can be renamed to 
`unregisteredSstFileEntry` here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3870: [Flink 6537] Fixes and improvements for incrementa...

2017-05-12 Thread shixiaogang
Github user shixiaogang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3870#discussion_r116161230
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalKeyedStateHandle.java
 ---
@@ -180,69 +176,66 @@ public long getStateSize() {
 
@Override
public void registerSharedStates(SharedStateRegistry stateRegistry) {
+
Preconditions.checkState(!registered, "The state handle has 
already registered its shared states.");
 
-   for (Map.Entry newSstFileEntry : 
newSstFiles.entrySet()) {
-   SstFileStateHandle stateHandle = new 
SstFileStateHandle(newSstFileEntry.getKey(), newSstFileEntry.getValue());
+   for (Map.Entry newSstFileEntry : 
unregisteredSstFiles.entrySet()) {
--- End diff --

I think it's better to rename `newSstFileEntry`to 
`unregisteredSstFileEntry`. What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3870: [Flink 6537] Fixes and improvements for incrementa...

2017-05-12 Thread shixiaogang
Github user shixiaogang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3870#discussion_r116161117
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalKeyedStateHandle.java
 ---
@@ -180,69 +176,66 @@ public long getStateSize() {
 
@Override
public void registerSharedStates(SharedStateRegistry stateRegistry) {
+
Preconditions.checkState(!registered, "The state handle has 
already registered its shared states.");
 
-   for (Map.Entry newSstFileEntry : 
newSstFiles.entrySet()) {
-   SstFileStateHandle stateHandle = new 
SstFileStateHandle(newSstFileEntry.getKey(), newSstFileEntry.getValue());
+   for (Map.Entry newSstFileEntry : 
unregisteredSstFiles.entrySet()) {
+   SharedStateRegistryKey registryKey =
+   
createSharedStateRegistryKeyFromFileName(newSstFileEntry.getKey());
 
-   int referenceCount = 
stateRegistry.register(stateHandle);
-   Preconditions.checkState(referenceCount == 1);
+   SharedStateRegistry.Result result =
+   stateRegistry.registerNewReference(registryKey, 
newSstFileEntry.getValue());
+
+   // We update our reference with the result from the 
registry, to prevent the following
+   // problem:
+   // A previous checkpoint n has already registered the 
state. This can happen if a
+   // following checkpoint (n + x) wants to reference the 
same state before the backend got
+   // notified that checkpoint n completed. In this case, 
the shared registry did
+   // deduplication and returns the previous reference.
+   newSstFileEntry.setValue(result.getReference());
}
 
-   for (Map.Entry oldSstFileEntry : 
oldSstFiles.entrySet()) {
-   SstFileStateHandle stateHandle = new 
SstFileStateHandle(oldSstFileEntry.getKey(), oldSstFileEntry.getValue());
+   for (Map.Entry oldSstFileName : 
registeredSstFiles.entrySet()) {
+   SharedStateRegistryKey registryKey =
+   
createSharedStateRegistryKeyFromFileName(oldSstFileName.getKey());
+
+   SharedStateRegistry.Result result = 
stateRegistry.obtainReference(registryKey);
 
-   int referenceCount = 
stateRegistry.register(stateHandle);
-   Preconditions.checkState(referenceCount > 1);
+   // Again we update our state handle with the result 
from the registry, thus replacing
+   // placeholder state handles with the originals.
+   oldSstFileName.setValue(result.getReference());
}
 
+   // Migrate state from unregistered to registered, so that it 
will not count as private state
+   // for #discardState() from now.
+   registeredSstFiles.putAll(unregisteredSstFiles);
+   unregisteredSstFiles.clear();
+
registered = true;
}
 
@Override
public void unregisterSharedStates(SharedStateRegistry stateRegistry) {
+
Preconditions.checkState(registered, "The state handle has not 
registered its shared states yet.");
 
-   for (Map.Entry newSstFileEntry : 
newSstFiles.entrySet()) {
-   stateRegistry.unregister(new 
SstFileStateHandle(newSstFileEntry.getKey(), newSstFileEntry.getValue()));
+   for (Map.Entry newSstFileEntry : 
unregisteredSstFiles.entrySet()) {
--- End diff --

We should not unregister those sst files that are not registered before.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3870: [Flink 6537] Fixes and improvements for incrementa...

2017-05-11 Thread StefanRRichter
GitHub user StefanRRichter opened a pull request:

https://github.com/apache/flink/pull/3870

[Flink 6537] Fixes and improvements for incremental checkpoints in RocksDB

This PR bundles several fixes and improvements for incremental checkpoints 
in RocksDB.

In particular, this addresses:
- [FLINK-6535] : JobID should not be part of the registration key to the 
SharedStateRegistry
- [FLINK-6533] : Duplicated registration of new shared state when 
checkpoint confirmations are still pending
- [FLINK-6527] : OperatorSubtaskState has empty implementations of 
(un)/registerSharedStates
- [FLINK-6504] : Lack of synchronization on materializedSstFiles in 
RocksDBKEyedStateBackend

It also gives a foundation for [FLINK-6534], extended test coverage will be 
provided as part of [FLINK-6540].

Some of the main changes are in the way the `SharedStateRegistry` works. It 
is now able to detect and resolve duplicate state registrations and to serve 
previously registered state by key. This way, we can avoid resending already 
registered state handles in the RPC, and can just send their registration keys 
instead.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StefanRRichter/flink FLINK-6537-part-1

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3870.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3870


commit 3585e224ab0e021573fe5eea582dbb7cfb1fef91
Author: Stefan Richter 
Date:   2017-05-10T12:57:55Z

[FLINK-6527] [checkpoint] OperatorSubtaskState has empty implementations of 
(un)/registerSharedStates

commit 6c22eca0809d9d5d6bb14950cd46b50ae2f9cf86
Author: Stefan Richter 
Date:   2017-05-10T15:59:39Z

[FLINK-6537] [checkpoint] First set of fixes for (de)registration of shared 
state in incremental checkpoints

commit d462e17e2b41cfb6b4a7a4fa8477c631f84106f6
Author: Stefan Richter 
Date:   2017-05-11T09:59:47Z

[FLINK-6504] [checkpoint] Fix synchronization on materializedSstFiles in 
RocksDBKeyedStateBackend




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---