[GitHub] flink pull request #3870: [Flink 6537] Fixes and improvements for incrementa...
Github user StefanRRichter closed the pull request at: https://github.com/apache/flink/pull/3870 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3870: [Flink 6537] Fixes and improvements for incrementa...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3870#discussion_r116179467 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java --- @@ -18,91 +18,137 @@ package org.apache.flink.runtime.state; +import org.apache.flink.runtime.concurrent.Executors; import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.HashMap; import java.util.Map; +import java.util.Objects; +import java.util.concurrent.Executor; /** * A {@code SharedStateRegistry} will be deployed in the - * {@link org.apache.flink.runtime.checkpoint.CheckpointCoordinator} to + * {@link org.apache.flink.runtime.checkpoint.CompletedCheckpointStore} to * maintain the reference count of {@link SharedStateHandle}s which are shared - * among different checkpoints. - * + * among different incremental checkpoints. */ public class SharedStateRegistry { private static final Logger LOG = LoggerFactory.getLogger(SharedStateRegistry.class); /** All registered state objects by an artificial key */ - private final MapregisteredStates; + private final Map registeredStates; + + /** Executor for async state deletion */ + private final Executor asyncDisposalExecutor; public SharedStateRegistry() { this.registeredStates = new HashMap<>(); + this.asyncDisposalExecutor = Executors.directExecutor(); //TODO: FLINK-6534 --- End diff -- I totally agree that there should not be a new executor, that is why I marked it with the TODO. This is just a preparation for the full fix of FLINK-6534. My plan for the full fix is to pass the IO executor from the `CompletedCheckpointStore` and use it inside the registry. This will happen outside of any synchronization. For now, this code is a working placeholder for the full fix that I will do as a followup. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3870: [Flink 6537] Fixes and improvements for incrementa...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3870#discussion_r116175297 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalKeyedStateHandle.java --- @@ -180,69 +176,66 @@ public long getStateSize() { @Override public void registerSharedStates(SharedStateRegistry stateRegistry) { + Preconditions.checkState(!registered, "The state handle has already registered its shared states."); - for (Map.EntrynewSstFileEntry : newSstFiles.entrySet()) { - SstFileStateHandle stateHandle = new SstFileStateHandle(newSstFileEntry.getKey(), newSstFileEntry.getValue()); + for (Map.Entry newSstFileEntry : unregisteredSstFiles.entrySet()) { --- End diff -- For FLINK-6545 we need to familiarize the savepoint serializer with the new incremental handles, but currently they are in the RocksDB package, invisible for savepoint classes. I am currently already working on making incremental snapshots a concept on a higher abstraction level, that is less tightly coupled to RocksDB. I think that we can then have incremental snapshots also for other backends, e.g. the memory based. The abstraction is simply, that all incremental snapshots can be divided into backend meta data, private data, newly created shared data and referenced shared data. Also the placeholder handle will become publicly available. Would this address comments? I could make those preparations already part of this PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3870: [Flink 6537] Fixes and improvements for incrementa...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3870#discussion_r116174101 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalKeyedStateHandle.java --- @@ -180,69 +176,66 @@ public long getStateSize() { @Override public void registerSharedStates(SharedStateRegistry stateRegistry) { + Preconditions.checkState(!registered, "The state handle has already registered its shared states."); - for (Map.EntrynewSstFileEntry : newSstFiles.entrySet()) { - SstFileStateHandle stateHandle = new SstFileStateHandle(newSstFileEntry.getKey(), newSstFileEntry.getValue()); + for (Map.Entry newSstFileEntry : unregisteredSstFiles.entrySet()) { + SharedStateRegistryKey registryKey = + createSharedStateRegistryKeyFromFileName(newSstFileEntry.getKey()); - int referenceCount = stateRegistry.register(stateHandle); - Preconditions.checkState(referenceCount == 1); + SharedStateRegistry.Result result = + stateRegistry.registerNewReference(registryKey, newSstFileEntry.getValue()); + + // We update our reference with the result from the registry, to prevent the following + // problem: + // A previous checkpoint n has already registered the state. This can happen if a + // following checkpoint (n + x) wants to reference the same state before the backend got + // notified that checkpoint n completed. In this case, the shared registry did + // deduplication and returns the previous reference. + newSstFileEntry.setValue(result.getReference()); } - for (Map.Entry oldSstFileEntry : oldSstFiles.entrySet()) { - SstFileStateHandle stateHandle = new SstFileStateHandle(oldSstFileEntry.getKey(), oldSstFileEntry.getValue()); + for (Map.Entry oldSstFileName : registeredSstFiles.entrySet()) { + SharedStateRegistryKey registryKey = + createSharedStateRegistryKeyFromFileName(oldSstFileName.getKey()); + + SharedStateRegistry.Result result = stateRegistry.obtainReference(registryKey); - int referenceCount = stateRegistry.register(stateHandle); - Preconditions.checkState(referenceCount > 1); + // Again we update our state handle with the result from the registry, thus replacing + // placeholder state handles with the originals. + oldSstFileName.setValue(result.getReference()); } + // Migrate state from unregistered to registered, so that it will not count as private state + // for #discardState() from now. + registeredSstFiles.putAll(unregisteredSstFiles); + unregisteredSstFiles.clear(); + registered = true; } @Override public void unregisterSharedStates(SharedStateRegistry stateRegistry) { + Preconditions.checkState(registered, "The state handle has not registered its shared states yet."); - for (Map.Entry newSstFileEntry : newSstFiles.entrySet()) { - stateRegistry.unregister(new SstFileStateHandle(newSstFileEntry.getKey(), newSstFileEntry.getValue())); + for (Map.Entry newSstFileEntry : unregisteredSstFiles.entrySet()) { --- End diff -- Yes, this is not required --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3870: [Flink 6537] Fixes and improvements for incrementa...
Github user shixiaogang commented on a diff in the pull request: https://github.com/apache/flink/pull/3870#discussion_r116172093 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java --- @@ -18,91 +18,137 @@ package org.apache.flink.runtime.state; +import org.apache.flink.runtime.concurrent.Executors; import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.HashMap; import java.util.Map; +import java.util.Objects; +import java.util.concurrent.Executor; /** * A {@code SharedStateRegistry} will be deployed in the - * {@link org.apache.flink.runtime.checkpoint.CheckpointCoordinator} to + * {@link org.apache.flink.runtime.checkpoint.CompletedCheckpointStore} to * maintain the reference count of {@link SharedStateHandle}s which are shared - * among different checkpoints. - * + * among different incremental checkpoints. */ public class SharedStateRegistry { private static final Logger LOG = LoggerFactory.getLogger(SharedStateRegistry.class); /** All registered state objects by an artificial key */ - private final MapregisteredStates; + private final Map registeredStates; + + /** Executor for async state deletion */ + private final Executor asyncDisposalExecutor; public SharedStateRegistry() { this.registeredStates = new HashMap<>(); + this.asyncDisposalExecutor = Executors.directExecutor(); //TODO: FLINK-6534 --- End diff -- I prefer not to use another asynchronous executor here. In my initial implementation of `SharedStateRegistry`, unreferenced shared states are not discarded immediately and are returned in a list. These unreferenced shared states then are discarded outside the synchronization scope. Given that the completed checkpoints are already discarded in an asynchronous thread in the `ZookeeperCompletedCheckpointStore` (which are more used in practice), we can avoid the usage of another asynchronous executor here. What do you think? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3870: [Flink 6537] Fixes and improvements for incrementa...
Github user shixiaogang commented on a diff in the pull request: https://github.com/apache/flink/pull/3870#discussion_r116161754 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java --- @@ -922,6 +940,39 @@ void releaseResources(boolean canceled) { } } } + + /** +* A placeholder state handle for shared state that will replaced by an original that was +* created in a previous checkpoint. So we don't have to send the handle twice, e.g. in +* case of {@link ByteStreamStateHandle}. +*/ + private static final class PlaceholderStreamStateHandle implements StreamStateHandle { --- End diff -- I think we can move `PlaceholderStreamStateHandle` out so that it can be used by other state backends. What do you think? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3870: [Flink 6537] Fixes and improvements for incrementa...
Github user shixiaogang commented on a diff in the pull request: https://github.com/apache/flink/pull/3870#discussion_r116161318 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalKeyedStateHandle.java --- @@ -180,69 +176,66 @@ public long getStateSize() { @Override public void registerSharedStates(SharedStateRegistry stateRegistry) { + Preconditions.checkState(!registered, "The state handle has already registered its shared states."); - for (Map.EntrynewSstFileEntry : newSstFiles.entrySet()) { - SstFileStateHandle stateHandle = new SstFileStateHandle(newSstFileEntry.getKey(), newSstFileEntry.getValue()); + for (Map.Entry newSstFileEntry : unregisteredSstFiles.entrySet()) { + SharedStateRegistryKey registryKey = + createSharedStateRegistryKeyFromFileName(newSstFileEntry.getKey()); - int referenceCount = stateRegistry.register(stateHandle); - Preconditions.checkState(referenceCount == 1); + SharedStateRegistry.Result result = + stateRegistry.registerNewReference(registryKey, newSstFileEntry.getValue()); + + // We update our reference with the result from the registry, to prevent the following + // problem: + // A previous checkpoint n has already registered the state. This can happen if a + // following checkpoint (n + x) wants to reference the same state before the backend got + // notified that checkpoint n completed. In this case, the shared registry did + // deduplication and returns the previous reference. + newSstFileEntry.setValue(result.getReference()); } - for (Map.Entry oldSstFileEntry : oldSstFiles.entrySet()) { - SstFileStateHandle stateHandle = new SstFileStateHandle(oldSstFileEntry.getKey(), oldSstFileEntry.getValue()); + for (Map.Entry oldSstFileName : registeredSstFiles.entrySet()) { --- End diff -- Similar to the previous comment, `oldSstFileName` can be renamed to `unregisteredSstFileEntry` here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3870: [Flink 6537] Fixes and improvements for incrementa...
Github user shixiaogang commented on a diff in the pull request: https://github.com/apache/flink/pull/3870#discussion_r116161230 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalKeyedStateHandle.java --- @@ -180,69 +176,66 @@ public long getStateSize() { @Override public void registerSharedStates(SharedStateRegistry stateRegistry) { + Preconditions.checkState(!registered, "The state handle has already registered its shared states."); - for (Map.EntrynewSstFileEntry : newSstFiles.entrySet()) { - SstFileStateHandle stateHandle = new SstFileStateHandle(newSstFileEntry.getKey(), newSstFileEntry.getValue()); + for (Map.Entry newSstFileEntry : unregisteredSstFiles.entrySet()) { --- End diff -- I think it's better to rename `newSstFileEntry`to `unregisteredSstFileEntry`. What do you think? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3870: [Flink 6537] Fixes and improvements for incrementa...
Github user shixiaogang commented on a diff in the pull request: https://github.com/apache/flink/pull/3870#discussion_r116161117 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalKeyedStateHandle.java --- @@ -180,69 +176,66 @@ public long getStateSize() { @Override public void registerSharedStates(SharedStateRegistry stateRegistry) { + Preconditions.checkState(!registered, "The state handle has already registered its shared states."); - for (Map.EntrynewSstFileEntry : newSstFiles.entrySet()) { - SstFileStateHandle stateHandle = new SstFileStateHandle(newSstFileEntry.getKey(), newSstFileEntry.getValue()); + for (Map.Entry newSstFileEntry : unregisteredSstFiles.entrySet()) { + SharedStateRegistryKey registryKey = + createSharedStateRegistryKeyFromFileName(newSstFileEntry.getKey()); - int referenceCount = stateRegistry.register(stateHandle); - Preconditions.checkState(referenceCount == 1); + SharedStateRegistry.Result result = + stateRegistry.registerNewReference(registryKey, newSstFileEntry.getValue()); + + // We update our reference with the result from the registry, to prevent the following + // problem: + // A previous checkpoint n has already registered the state. This can happen if a + // following checkpoint (n + x) wants to reference the same state before the backend got + // notified that checkpoint n completed. In this case, the shared registry did + // deduplication and returns the previous reference. + newSstFileEntry.setValue(result.getReference()); } - for (Map.Entry oldSstFileEntry : oldSstFiles.entrySet()) { - SstFileStateHandle stateHandle = new SstFileStateHandle(oldSstFileEntry.getKey(), oldSstFileEntry.getValue()); + for (Map.Entry oldSstFileName : registeredSstFiles.entrySet()) { + SharedStateRegistryKey registryKey = + createSharedStateRegistryKeyFromFileName(oldSstFileName.getKey()); + + SharedStateRegistry.Result result = stateRegistry.obtainReference(registryKey); - int referenceCount = stateRegistry.register(stateHandle); - Preconditions.checkState(referenceCount > 1); + // Again we update our state handle with the result from the registry, thus replacing + // placeholder state handles with the originals. + oldSstFileName.setValue(result.getReference()); } + // Migrate state from unregistered to registered, so that it will not count as private state + // for #discardState() from now. + registeredSstFiles.putAll(unregisteredSstFiles); + unregisteredSstFiles.clear(); + registered = true; } @Override public void unregisterSharedStates(SharedStateRegistry stateRegistry) { + Preconditions.checkState(registered, "The state handle has not registered its shared states yet."); - for (Map.Entry newSstFileEntry : newSstFiles.entrySet()) { - stateRegistry.unregister(new SstFileStateHandle(newSstFileEntry.getKey(), newSstFileEntry.getValue())); + for (Map.Entry newSstFileEntry : unregisteredSstFiles.entrySet()) { --- End diff -- We should not unregister those sst files that are not registered before. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3870: [Flink 6537] Fixes and improvements for incrementa...
GitHub user StefanRRichter opened a pull request: https://github.com/apache/flink/pull/3870 [Flink 6537] Fixes and improvements for incremental checkpoints in RocksDB This PR bundles several fixes and improvements for incremental checkpoints in RocksDB. In particular, this addresses: - [FLINK-6535] : JobID should not be part of the registration key to the SharedStateRegistry - [FLINK-6533] : Duplicated registration of new shared state when checkpoint confirmations are still pending - [FLINK-6527] : OperatorSubtaskState has empty implementations of (un)/registerSharedStates - [FLINK-6504] : Lack of synchronization on materializedSstFiles in RocksDBKEyedStateBackend It also gives a foundation for [FLINK-6534], extended test coverage will be provided as part of [FLINK-6540]. Some of the main changes are in the way the `SharedStateRegistry` works. It is now able to detect and resolve duplicate state registrations and to serve previously registered state by key. This way, we can avoid resending already registered state handles in the RPC, and can just send their registration keys instead. You can merge this pull request into a Git repository by running: $ git pull https://github.com/StefanRRichter/flink FLINK-6537-part-1 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3870.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3870 commit 3585e224ab0e021573fe5eea582dbb7cfb1fef91 Author: Stefan RichterDate: 2017-05-10T12:57:55Z [FLINK-6527] [checkpoint] OperatorSubtaskState has empty implementations of (un)/registerSharedStates commit 6c22eca0809d9d5d6bb14950cd46b50ae2f9cf86 Author: Stefan Richter Date: 2017-05-10T15:59:39Z [FLINK-6537] [checkpoint] First set of fixes for (de)registration of shared state in incremental checkpoints commit d462e17e2b41cfb6b4a7a4fa8477c631f84106f6 Author: Stefan Richter Date: 2017-05-11T09:59:47Z [FLINK-6504] [checkpoint] Fix synchronization on materializedSstFiles in RocksDBKeyedStateBackend --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---