[GitHub] [flink] zoltar9264 commented on a diff in pull request #22669: [FLINK-29913] fix shared state be discarded by mistake when maxConcurrentCheckpoint>1
zoltar9264 commented on code in PR #22669: URL: https://github.com/apache/flink/pull/22669#discussion_r1247564430 ## flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java: ## @@ -122,37 +121,20 @@ public StreamStateHandle registerReference( LOG.trace( "Duplicated registration under key {} with a placeholder (normal case)", registrationKey); -scheduledStateDeletion = newHandle; -} else if (entry.confirmed) { -LOG.info( -"Duplicated registration under key {} of a new state: {}. " -+ "This might happen if checkpoint confirmation was delayed and state backend re-uploaded the state. " -+ "Discarding the new state and keeping the old one which is included into a completed checkpoint", -registrationKey, -newHandle); -scheduledStateDeletion = newHandle; } else { -// Old entry is not in a confirmed checkpoint yet, and the new one differs. -// This might result from (omitted KG range here for simplicity): -// 1. Flink recovers from a failure using a checkpoint 1 -// 2. State Backend is initialized to UID xyz and a set of SST: { 01.sst } -// 3. JM triggers checkpoint 2 -// 4. TM sends handle: "xyz-002.sst"; JM registers it under "xyz-002.sst" -// 5. TM crashes; everything is repeated from (2) -// 6. TM recovers from CP 1 again: backend UID "xyz", SST { 01.sst } -// 7. JM triggers checkpoint 3 -// 8. TM sends NEW state "xyz-002.sst" -// 9. JM discards it as duplicate -// 10. checkpoint completes, but a wrong SST file is used -// So we use a new entry and discard the old one: +// might be a bug expect the StreamStateHandleWrapper used by +// ChangelogStateBackendHandleImpl LOG.info( -"Duplicated registration under key {} of a new state: {}. " -+ "This might happen during the task failover if state backend creates different states with the same key before and after the failure. " -+ "Discarding the OLD state and keeping the NEW one which is included into a completed checkpoint", +"the registered handle should equal to the previous one or is a placeholder, register key:{}, handle:{}", registrationKey, newHandle); -scheduledStateDeletion = entry.stateHandle; -entry.stateHandle = newHandle; +if (entry.stateHandle instanceof EmptyDiscardStateObjectForRegister) { Review Comment: Test passed after rebase latest master branch :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zoltar9264 commented on a diff in pull request #22669: [FLINK-29913] fix shared state be discarded by mistake when maxConcurrentCheckpoint>1
zoltar9264 commented on code in PR #22669: URL: https://github.com/apache/flink/pull/22669#discussion_r1246058692 ## flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksNativeFullSnapshotStrategy.java: ## @@ -219,34 +219,40 @@ public SnapshotResult get(CloseableRegistry snapshotCloseableR } } -private void uploadSstFiles( -@Nonnull Map privateFiles, +/** upload files and return total uploaded size. */ +private long uploadSnapshotFiles( +@Nonnull List privateFiles, Review Comment: Caller is more complex, maybe it's better to keep the status quo? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zoltar9264 commented on a diff in pull request #22669: [FLINK-29913] fix shared state be discarded by mistake when maxConcurrentCheckpoint>1
zoltar9264 commented on code in PR #22669: URL: https://github.com/apache/flink/pull/22669#discussion_r1246048717 ## flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java: ## @@ -390,23 +393,36 @@ public void release() { } protected static final PreviousSnapshot EMPTY_PREVIOUS_SNAPSHOT = -new PreviousSnapshot(Collections.emptyMap()); +new PreviousSnapshot(Collections.emptyList()); /** Previous snapshot with uploaded sst files. */ protected static class PreviousSnapshot { -@Nullable private final Map confirmedSstFiles; - -protected PreviousSnapshot(@Nullable Map confirmedSstFiles) { -this.confirmedSstFiles = confirmedSstFiles; +@Nonnull private final Map confirmedSstFiles; Review Comment: Sorry I didn't understand where can be simplified by this ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zoltar9264 commented on a diff in pull request #22669: [FLINK-29913] fix shared state be discarded by mistake when maxConcurrentCheckpoint>1
zoltar9264 commented on code in PR #22669: URL: https://github.com/apache/flink/pull/22669#discussion_r1246040024 ## flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java: ## @@ -334,53 +325,61 @@ public SnapshotResult get(CloseableRegistry snapshotCloseableR } } -private void uploadSstFiles( -@Nonnull Map sstFiles, -@Nonnull Map miscFiles, +/** upload files and return total uploaded size. */ +private long uploadSnapshotFiles( +@Nonnull List sstFiles, +@Nonnull List miscFiles, @Nonnull CloseableRegistry snapshotCloseableRegistry, @Nonnull CloseableRegistry tmpResourcesRegistry) throws Exception { // write state data Preconditions.checkState(localBackupDirectory.exists()); -Map sstFilePaths = new HashMap<>(); -Map miscFilePaths = new HashMap<>(); - Path[] files = localBackupDirectory.listDirectory(); +long uploadedSize = 0; if (files != null) { +List sstFilePaths = new ArrayList<>(files.length); +List miscFilePaths = new ArrayList<>(files.length); + createUploadFilePaths(files, sstFiles, sstFilePaths, miscFilePaths); final CheckpointedStateScope stateScope = sharingFilesStrategy == SnapshotType.SharingFilesStrategy.NO_SHARING ? CheckpointedStateScope.EXCLUSIVE : CheckpointedStateScope.SHARED; -sstFiles.putAll( + +List uploadedSstFiles = stateUploader.uploadFilesToCheckpointFs( sstFilePaths, checkpointStreamFactory, stateScope, snapshotCloseableRegistry, -tmpResourcesRegistry)); -miscFiles.putAll( +tmpResourcesRegistry); +uploadedSize += +uploadedSstFiles.stream() +.mapToLong(e -> e.getHandle().getStateSize()) +.sum(); +sstFiles.addAll(uploadedSstFiles); + +List uploadedMiscFiles = stateUploader.uploadFilesToCheckpointFs( miscFilePaths, checkpointStreamFactory, stateScope, snapshotCloseableRegistry, -tmpResourcesRegistry)); - -synchronized (uploadedStateIDs) { +tmpResourcesRegistry); +uploadedSize += +uploadedMiscFiles.stream() +.mapToLong(e -> e.getHandle().getStateSize()) +.sum(); +miscFiles.addAll(uploadedMiscFiles); + +synchronized (uploadedSstFilesMap) { switch (sharingFilesStrategy) { case FORWARD_BACKWARD: case FORWARD: -uploadedStateIDs.put( -checkpointId, -sstFiles.entrySet().stream() -.collect( -Collectors.toMap( -Map.Entry::getKey, -t -> t.getValue().getStateSize(; +uploadedSstFilesMap.put(checkpointId, sstFiles); Review Comment: I had the same concern before, so I checked the context and found no code that would modify `sstFiles`, so I didn't add this protection. But now since you have the same concern, and more importantly to prevent a future change from causing the problem, I think it makes sense to use a unmodifiable collection. Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zoltar9264 commented on a diff in pull request #22669: [FLINK-29913] fix shared state be discarded by mistake when maxConcurrentCheckpoint>1
zoltar9264 commented on code in PR #22669: URL: https://github.com/apache/flink/pull/22669#discussion_r1246040024 ## flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java: ## @@ -334,53 +325,61 @@ public SnapshotResult get(CloseableRegistry snapshotCloseableR } } -private void uploadSstFiles( -@Nonnull Map sstFiles, -@Nonnull Map miscFiles, +/** upload files and return total uploaded size. */ +private long uploadSnapshotFiles( +@Nonnull List sstFiles, +@Nonnull List miscFiles, @Nonnull CloseableRegistry snapshotCloseableRegistry, @Nonnull CloseableRegistry tmpResourcesRegistry) throws Exception { // write state data Preconditions.checkState(localBackupDirectory.exists()); -Map sstFilePaths = new HashMap<>(); -Map miscFilePaths = new HashMap<>(); - Path[] files = localBackupDirectory.listDirectory(); +long uploadedSize = 0; if (files != null) { +List sstFilePaths = new ArrayList<>(files.length); +List miscFilePaths = new ArrayList<>(files.length); + createUploadFilePaths(files, sstFiles, sstFilePaths, miscFilePaths); final CheckpointedStateScope stateScope = sharingFilesStrategy == SnapshotType.SharingFilesStrategy.NO_SHARING ? CheckpointedStateScope.EXCLUSIVE : CheckpointedStateScope.SHARED; -sstFiles.putAll( + +List uploadedSstFiles = stateUploader.uploadFilesToCheckpointFs( sstFilePaths, checkpointStreamFactory, stateScope, snapshotCloseableRegistry, -tmpResourcesRegistry)); -miscFiles.putAll( +tmpResourcesRegistry); +uploadedSize += +uploadedSstFiles.stream() +.mapToLong(e -> e.getHandle().getStateSize()) +.sum(); +sstFiles.addAll(uploadedSstFiles); + +List uploadedMiscFiles = stateUploader.uploadFilesToCheckpointFs( miscFilePaths, checkpointStreamFactory, stateScope, snapshotCloseableRegistry, -tmpResourcesRegistry)); - -synchronized (uploadedStateIDs) { +tmpResourcesRegistry); +uploadedSize += +uploadedMiscFiles.stream() +.mapToLong(e -> e.getHandle().getStateSize()) +.sum(); +miscFiles.addAll(uploadedMiscFiles); + +synchronized (uploadedSstFilesMap) { switch (sharingFilesStrategy) { case FORWARD_BACKWARD: case FORWARD: -uploadedStateIDs.put( -checkpointId, -sstFiles.entrySet().stream() -.collect( -Collectors.toMap( -Map.Entry::getKey, -t -> t.getValue().getStateSize(; +uploadedSstFilesMap.put(checkpointId, sstFiles); Review Comment: I had the same concern before, so I checked the context and found no code that would modify `sstFiles`, so I didn't add this protection. But now since you have the same concern, and more importantly to prevent a future change from causing the problem, I think it makes sense to add protection. Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zoltar9264 commented on a diff in pull request #22669: [FLINK-29913] fix shared state be discarded by mistake when maxConcurrentCheckpoint>1
zoltar9264 commented on code in PR #22669: URL: https://github.com/apache/flink/pull/22669#discussion_r1246033453 ## flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java: ## @@ -334,53 +325,61 @@ public SnapshotResult get(CloseableRegistry snapshotCloseableR } } -private void uploadSstFiles( -@Nonnull Map sstFiles, -@Nonnull Map miscFiles, +/** upload files and return total uploaded size. */ +private long uploadSnapshotFiles( +@Nonnull List sstFiles, +@Nonnull List miscFiles, @Nonnull CloseableRegistry snapshotCloseableRegistry, @Nonnull CloseableRegistry tmpResourcesRegistry) throws Exception { // write state data Preconditions.checkState(localBackupDirectory.exists()); -Map sstFilePaths = new HashMap<>(); -Map miscFilePaths = new HashMap<>(); - Path[] files = localBackupDirectory.listDirectory(); +long uploadedSize = 0; if (files != null) { +List sstFilePaths = new ArrayList<>(files.length); +List miscFilePaths = new ArrayList<>(files.length); + createUploadFilePaths(files, sstFiles, sstFilePaths, miscFilePaths); final CheckpointedStateScope stateScope = sharingFilesStrategy == SnapshotType.SharingFilesStrategy.NO_SHARING ? CheckpointedStateScope.EXCLUSIVE : CheckpointedStateScope.SHARED; -sstFiles.putAll( + +List uploadedSstFiles = stateUploader.uploadFilesToCheckpointFs( sstFilePaths, checkpointStreamFactory, stateScope, snapshotCloseableRegistry, -tmpResourcesRegistry)); -miscFiles.putAll( +tmpResourcesRegistry); +uploadedSize += +uploadedSstFiles.stream() +.mapToLong(e -> e.getHandle().getStateSize()) Review Comment: Good suggestion, done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zoltar9264 commented on a diff in pull request #22669: [FLINK-29913] fix shared state be discarded by mistake when maxConcurrentCheckpoint>1
zoltar9264 commented on code in PR #22669: URL: https://github.com/apache/flink/pull/22669#discussion_r1246032593 ## flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java: ## @@ -390,23 +393,36 @@ public void release() { } protected static final PreviousSnapshot EMPTY_PREVIOUS_SNAPSHOT = -new PreviousSnapshot(Collections.emptyMap()); +new PreviousSnapshot(Collections.emptyList()); /** Previous snapshot with uploaded sst files. */ protected static class PreviousSnapshot { -@Nullable private final Map confirmedSstFiles; - -protected PreviousSnapshot(@Nullable Map confirmedSstFiles) { -this.confirmedSstFiles = confirmedSstFiles; +@Nonnull private final Map confirmedSstFiles; + +protected PreviousSnapshot(@Nullable Collection confirmedSstFiles) { +this.confirmedSstFiles = +confirmedSstFiles != null +? confirmedSstFiles.stream() +.collect( +Collectors.toMap( + HandleAndLocalPath::getLocalPath, + HandleAndLocalPath::getHandle)) +: Collections.emptyMap(); } -protected Optional getUploaded(StateHandleID stateHandleID) { -if (confirmedSstFiles != null && confirmedSstFiles.containsKey(stateHandleID)) { -// we introduce a placeholder state handle, that is replaced with the -// original from the shared state registry (created from a previous checkpoint) -return Optional.of( -new PlaceholderStreamStateHandle(confirmedSstFiles.get(stateHandleID))); +protected Optional getUploaded(String filename) { +if (confirmedSstFiles.containsKey(filename)) { +StreamStateHandle handle = confirmedSstFiles.get(filename); +if (handle instanceof ByteStreamStateHandle) { Review Comment: I see, fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zoltar9264 commented on a diff in pull request #22669: [FLINK-29913] fix shared state be discarded by mistake when maxConcurrentCheckpoint>1
zoltar9264 commented on code in PR #22669: URL: https://github.com/apache/flink/pull/22669#discussion_r1246031733 ## flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java: ## @@ -32,9 +36,62 @@ public interface IncrementalKeyedStateHandle UUID getBackendIdentifier(); /** - * Returns a set of ids of all registered shared states in the backend at the time this was - * created. + * Returns a list of all shared states and the corresponding localPath in the backend at the + * time this was created. */ @Nonnull -Map getSharedStateHandles(); +List getSharedStateHandles(); + +/** A Holder of StreamStateHandle and the corresponding localPath. */ +final class HandleAndLocalPath implements Serializable { + +private static final long serialVersionUID = 7711754687567545052L; + +StreamStateHandle handle; +final String localPath; Review Comment: Yes, thanks for careful review ! fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zoltar9264 commented on a diff in pull request #22669: [FLINK-29913] fix shared state be discarded by mistake when maxConcurrentCheckpoint>1
zoltar9264 commented on code in PR #22669: URL: https://github.com/apache/flink/pull/22669#discussion_r1246030055 ## flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java: ## @@ -122,37 +121,20 @@ public StreamStateHandle registerReference( LOG.trace( "Duplicated registration under key {} with a placeholder (normal case)", registrationKey); -scheduledStateDeletion = newHandle; -} else if (entry.confirmed) { -LOG.info( -"Duplicated registration under key {} of a new state: {}. " -+ "This might happen if checkpoint confirmation was delayed and state backend re-uploaded the state. " -+ "Discarding the new state and keeping the old one which is included into a completed checkpoint", -registrationKey, -newHandle); -scheduledStateDeletion = newHandle; } else { -// Old entry is not in a confirmed checkpoint yet, and the new one differs. -// This might result from (omitted KG range here for simplicity): -// 1. Flink recovers from a failure using a checkpoint 1 -// 2. State Backend is initialized to UID xyz and a set of SST: { 01.sst } -// 3. JM triggers checkpoint 2 -// 4. TM sends handle: "xyz-002.sst"; JM registers it under "xyz-002.sst" -// 5. TM crashes; everything is repeated from (2) -// 6. TM recovers from CP 1 again: backend UID "xyz", SST { 01.sst } -// 7. JM triggers checkpoint 3 -// 8. TM sends NEW state "xyz-002.sst" -// 9. JM discards it as duplicate -// 10. checkpoint completes, but a wrong SST file is used -// So we use a new entry and discard the old one: +// might be a bug expect the StreamStateHandleWrapper used by +// ChangelogStateBackendHandleImpl LOG.info( -"Duplicated registration under key {} of a new state: {}. " -+ "This might happen during the task failover if state backend creates different states with the same key before and after the failure. " -+ "Discarding the OLD state and keeping the NEW one which is included into a completed checkpoint", +"the registered handle should equal to the previous one or is a placeholder, register key:{}, handle:{}", registrationKey, newHandle); -scheduledStateDeletion = entry.stateHandle; -entry.stateHandle = newHandle; +if (entry.stateHandle instanceof EmptyDiscardStateObjectForRegister) { Review Comment: You are right, I missed the 2nd case before. I've thrown an exception and tried to fix the test which then fails. But the [test ci core still fails](https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=50565=logs=66645748-20ed-5f80-dbdf-bb5906c15462=e32bdfab-58bb-53ea-d411-d67a54d2939f) and I don't know why, can you help me take a look @rkhachatryan ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zoltar9264 commented on a diff in pull request #22669: [FLINK-29913] fix shared state be discarded by mistake when maxConcurrentCheckpoint>1
zoltar9264 commented on code in PR #22669: URL: https://github.com/apache/flink/pull/22669#discussion_r1246030055 ## flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java: ## @@ -122,37 +121,20 @@ public StreamStateHandle registerReference( LOG.trace( "Duplicated registration under key {} with a placeholder (normal case)", registrationKey); -scheduledStateDeletion = newHandle; -} else if (entry.confirmed) { -LOG.info( -"Duplicated registration under key {} of a new state: {}. " -+ "This might happen if checkpoint confirmation was delayed and state backend re-uploaded the state. " -+ "Discarding the new state and keeping the old one which is included into a completed checkpoint", -registrationKey, -newHandle); -scheduledStateDeletion = newHandle; } else { -// Old entry is not in a confirmed checkpoint yet, and the new one differs. -// This might result from (omitted KG range here for simplicity): -// 1. Flink recovers from a failure using a checkpoint 1 -// 2. State Backend is initialized to UID xyz and a set of SST: { 01.sst } -// 3. JM triggers checkpoint 2 -// 4. TM sends handle: "xyz-002.sst"; JM registers it under "xyz-002.sst" -// 5. TM crashes; everything is repeated from (2) -// 6. TM recovers from CP 1 again: backend UID "xyz", SST { 01.sst } -// 7. JM triggers checkpoint 3 -// 8. TM sends NEW state "xyz-002.sst" -// 9. JM discards it as duplicate -// 10. checkpoint completes, but a wrong SST file is used -// So we use a new entry and discard the old one: +// might be a bug expect the StreamStateHandleWrapper used by +// ChangelogStateBackendHandleImpl LOG.info( -"Duplicated registration under key {} of a new state: {}. " -+ "This might happen during the task failover if state backend creates different states with the same key before and after the failure. " -+ "Discarding the OLD state and keeping the NEW one which is included into a completed checkpoint", +"the registered handle should equal to the previous one or is a placeholder, register key:{}, handle:{}", registrationKey, newHandle); -scheduledStateDeletion = entry.stateHandle; -entry.stateHandle = newHandle; +if (entry.stateHandle instanceof EmptyDiscardStateObjectForRegister) { Review Comment: You are right, I missed the 2nd case before. I've thrown an exception and tried to fix the test which then fails. But the [test ci core still fails](https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=50565=logs=66645748-20ed-5f80-dbdf-bb5906c15462=e32bdfab-58bb-53ea-d411-d67a54d2939f) and I don't know why, can you help me take a look ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zoltar9264 commented on a diff in pull request #22669: [FLINK-29913] fix shared state be discarded by mistake when maxConcurrentCheckpoint>1
zoltar9264 commented on code in PR #22669: URL: https://github.com/apache/flink/pull/22669#discussion_r1243650264 ## flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java: ## @@ -758,6 +762,15 @@ private ChangelogSnapshotState completeRestore( */ @Override public Optional initMaterialization() throws Exception { +if (materializedId > 0 +&& lastConfirmedMaterializationId < materializedId - 1 +&& lastFailedMaterializationId < materializedId - 1) { +LOG.info( +"materialization:{} not confirmed or failed or cancelled, skip trigger new one.", +materializedId - 1); +return Optional.empty(); +} Review Comment: 1. done. 2. done. 3. actually, check of `materializaedId > 0` is meaningless, this is my mistake, deleted it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zoltar9264 commented on a diff in pull request #22669: [FLINK-29913] fix shared state be discarded by mistake when maxConcurrentCheckpoint>1
zoltar9264 commented on code in PR #22669: URL: https://github.com/apache/flink/pull/22669#discussion_r1243246177 ## flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java: ## @@ -728,6 +731,7 @@ private ChangelogSnapshotState completeRestore( materializationId = Math.max(materializationId, h.getMaterializationID()); } } +this.lastFailedMaterializationId = materializationId; Review Comment: Sorry, this is my mistake. I originally wanted to set `lastConfirmedMaterializationId` to "restore" `materializationId` so that the first materialization can be triggered. (Or we can add a field to save the id of the first materialization, and check whether it is the first materialization when `initMaterialization`.) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zoltar9264 commented on a diff in pull request #22669: [FLINK-29913] fix shared state be discarded by mistake when maxConcurrentCheckpoint>1
zoltar9264 commented on code in PR #22669: URL: https://github.com/apache/flink/pull/22669#discussion_r1235147823 ## flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandle.java: ## @@ -359,11 +357,12 @@ IncrementalRemoteKeyedStateHandle copy() { stateHandleId); } -/** Create a unique key to register one of our shared state handles. */ +/** Create a unique key based on physical id to register one of our shared state handles. */ @VisibleForTesting -public SharedStateRegistryKey createSharedStateRegistryKeyFromFileName(StateHandleID shId) { +public SharedStateRegistryKey createSharedStateRegistryKey(StreamStateHandle handle) { +String keyString = handle.getStreamStateHandleID().getKeyString(); return new SharedStateRegistryKey( -String.valueOf(backendIdentifier) + '-' + keyGroupRange, shId); + UUID.nameUUIDFromBytes(keyString.getBytes(StandardCharsets.UTF_8)).toString()); Review Comment: Yes, key strings tend to be longer. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zoltar9264 commented on a diff in pull request #22669: [FLINK-29913] fix shared state be discarded by mistake when maxConcurrentCheckpoint>1
zoltar9264 commented on code in PR #22669: URL: https://github.com/apache/flink/pull/22669#discussion_r1235146158 ## flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java: ## @@ -32,9 +36,62 @@ UUID getBackendIdentifier(); /** - * Returns a set of ids of all registered shared states in the backend at the time this was - * created. + * Returns a list of all shared states and the corresponding localPath in the backend at the + * time this was created. */ @Nonnull -Map getSharedStateHandles(); +List getSharedStateHandles(); + +/** A Holder of StreamStateHandle and the corresponding localPath. */ +final class HandleAndLocalPath implements Serializable { + +private static final long serialVersionUID = 7711754687567545052L; + +StreamStateHandle handle; +String localPath; Review Comment: You are right, I've changed it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zoltar9264 commented on a diff in pull request #22669: [FLINK-29913] fix shared state be discarded by mistake when maxConcurrentCheckpoint>1
zoltar9264 commented on code in PR #22669: URL: https://github.com/apache/flink/pull/22669#discussion_r1235145121 ## flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java: ## @@ -32,9 +36,62 @@ UUID getBackendIdentifier(); /** - * Returns a set of ids of all registered shared states in the backend at the time this was - * created. + * Returns a list of all shared states and the corresponding localPath in the backend at the + * time this was created. */ @Nonnull -Map getSharedStateHandles(); +List getSharedStateHandles(); + +/** A Holder of StreamStateHandle and the corresponding localPath. */ +final class HandleAndLocalPath implements Serializable { Review Comment: Thank you for helping to review this pr @StefanRRichter ! `IncrementalKeyedStateHandle` is interface, so the keyword `static` is not necessary. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zoltar9264 commented on a diff in pull request #22669: [FLINK-29913] fix shared state be discarded by mistake when maxConcurrentCheckpoint>1
zoltar9264 commented on code in PR #22669: URL: https://github.com/apache/flink/pull/22669#discussion_r1235016273 ## flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java: ## @@ -122,37 +121,20 @@ public StreamStateHandle registerReference( LOG.trace( "Duplicated registration under key {} with a placeholder (normal case)", registrationKey); -scheduledStateDeletion = newHandle; -} else if (entry.confirmed) { -LOG.info( -"Duplicated registration under key {} of a new state: {}. " -+ "This might happen if checkpoint confirmation was delayed and state backend re-uploaded the state. " -+ "Discarding the new state and keeping the old one which is included into a completed checkpoint", -registrationKey, -newHandle); -scheduledStateDeletion = newHandle; } else { -// Old entry is not in a confirmed checkpoint yet, and the new one differs. -// This might result from (omitted KG range here for simplicity): -// 1. Flink recovers from a failure using a checkpoint 1 -// 2. State Backend is initialized to UID xyz and a set of SST: { 01.sst } -// 3. JM triggers checkpoint 2 -// 4. TM sends handle: "xyz-002.sst"; JM registers it under "xyz-002.sst" -// 5. TM crashes; everything is repeated from (2) -// 6. TM recovers from CP 1 again: backend UID "xyz", SST { 01.sst } -// 7. JM triggers checkpoint 3 -// 8. TM sends NEW state "xyz-002.sst" -// 9. JM discards it as duplicate -// 10. checkpoint completes, but a wrong SST file is used -// So we use a new entry and discard the old one: +// might be a bug expect the StreamStateHandleWrapper used by +// ChangelogStateBackendHandleImpl LOG.info( -"Duplicated registration under key {} of a new state: {}. " -+ "This might happen during the task failover if state backend creates different states with the same key before and after the failure. " -+ "Discarding the OLD state and keeping the NEW one which is included into a completed checkpoint", +"the registered handle should equal to the previous one or is a placeholder, register key:{}, handle:{}", registrationKey, newHandle); -scheduledStateDeletion = entry.stateHandle; -entry.stateHandle = newHandle; +if (entry.stateHandle instanceof EmptyDiscardStateObjectForRegister) { Review Comment: `ChangelogStateBackendHandleImpl` will use `ChangelogStateBackendHandleImpl.StreamStateHandleWrapper` register materialized state, so I think we can't throw an exception directly. Maybe we can add a log ? BTW, `ChangelogStateBackendHandleImpl.StreamStateHandleWrapper` can be deleted after [FLINK-25862](https://issues.apache.org/jira/browse/FLINK-25862) is resolved. I'd be happy to pick up FLINK-25862 after this ticket is resolved. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zoltar9264 commented on a diff in pull request #22669: [FLINK-29913] fix shared state be discarded by mistake when maxConcurrentCheckpoint>1
zoltar9264 commented on code in PR #22669: URL: https://github.com/apache/flink/pull/22669#discussion_r1220936492 ## flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java: ## @@ -395,18 +394,16 @@ public void release() { /** Previous snapshot with uploaded sst files. */ protected static class PreviousSnapshot { -@Nullable private final Map confirmedSstFiles; +@Nullable private final Map confirmedSstFiles; -protected PreviousSnapshot(@Nullable Map confirmedSstFiles) { +protected PreviousSnapshot( +@Nullable Map confirmedSstFiles) { this.confirmedSstFiles = confirmedSstFiles; } protected Optional getUploaded(StateHandleID stateHandleID) { if (confirmedSstFiles != null && confirmedSstFiles.containsKey(stateHandleID)) { -// we introduce a placeholder state handle, that is replaced with the -// original from the shared state registry (created from a previous checkpoint) -return Optional.of( -new PlaceholderStreamStateHandle(confirmedSstFiles.get(stateHandleID))); Review Comment: Thanks for your patience in explaining @rkhachatryan , I totally agree with your last proposal. And I realized in coding that maybe we shouldn't store SharedStateRegistryKey in IncrementalRemoteKeyedStateHandle. Because SharedStateRegistryKey is only used during registration and can be calculated from physical id. More importantly, when the remote state file is relocated (savepoint is relocatable), the physical id will change accordingly. The [aforementioned change of IncrementalRemoteKeyedStateHandle](https://github.com/apache/flink/pull/22669#discussion_r1213882810) should look like this: ``` /** Shared state in the incremental checkpoint. */ private final List sharedState; /** Private state in the incremental checkpoint. */ private final List privateState; private static final class HandleAndLocalPath { StreamStateHandle handle; String localPath; } ``` This way, we don't need to introduce a new serialized version for IncrementalRemoteKeyedStateHandle in `MetadataV2V3SerializerBase` either. Because `HandleAndLocalPath` and `Map.Entry` can use exactly the same serialization form. WDYT ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zoltar9264 commented on a diff in pull request #22669: [FLINK-29913] fix shared state be discarded by mistake when maxConcurrentCheckpoint>1
zoltar9264 commented on code in PR #22669: URL: https://github.com/apache/flink/pull/22669#discussion_r1220936492 ## flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java: ## @@ -395,18 +394,16 @@ public void release() { /** Previous snapshot with uploaded sst files. */ protected static class PreviousSnapshot { -@Nullable private final Map confirmedSstFiles; +@Nullable private final Map confirmedSstFiles; -protected PreviousSnapshot(@Nullable Map confirmedSstFiles) { +protected PreviousSnapshot( +@Nullable Map confirmedSstFiles) { this.confirmedSstFiles = confirmedSstFiles; } protected Optional getUploaded(StateHandleID stateHandleID) { if (confirmedSstFiles != null && confirmedSstFiles.containsKey(stateHandleID)) { -// we introduce a placeholder state handle, that is replaced with the -// original from the shared state registry (created from a previous checkpoint) -return Optional.of( -new PlaceholderStreamStateHandle(confirmedSstFiles.get(stateHandleID))); Review Comment: Thanks for your patience in explaining @rkhachatryan , I totally agree with your last proposal. And I realized in coding that maybe we shouldn't store SharedStateRegistryKey in IncrementalRemoteKeyedStateHandle. Because SharedStateRegistryKey is only used during registration and can be calculated from physical id. More importantly, when the remote state file is relocated (savepoint is relocatable), the physical id will change accordingly. The [aforementioned change of IncrementalRemoteKeyedStateHandle](https://github.com/apache/flink/pull/22669#discussion_r1213882810) should look like this: ``` /** Shared state in the incremental checkpoint. */ private final List sharedState; /** Private state in the incremental checkpoint. */ private final List privateState; private static final class HandleAndLocalPath { StreamStateHandle handle; String localPath; } ``` This way, we don't need to introduce a new serialized version for IncrementalRemoteKeyedStateHandle in `MetadataV2V3SerializerBase` either. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zoltar9264 commented on a diff in pull request #22669: [FLINK-29913] fix shared state be discarded by mistake when maxConcurrentCheckpoint>1
zoltar9264 commented on code in PR #22669: URL: https://github.com/apache/flink/pull/22669#discussion_r1216375020 ## flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java: ## @@ -395,18 +394,16 @@ public void release() { /** Previous snapshot with uploaded sst files. */ protected static class PreviousSnapshot { -@Nullable private final Map confirmedSstFiles; +@Nullable private final Map confirmedSstFiles; -protected PreviousSnapshot(@Nullable Map confirmedSstFiles) { +protected PreviousSnapshot( +@Nullable Map confirmedSstFiles) { this.confirmedSstFiles = confirmedSstFiles; } protected Optional getUploaded(StateHandleID stateHandleID) { if (confirmedSstFiles != null && confirmedSstFiles.containsKey(stateHandleID)) { -// we introduce a placeholder state handle, that is replaced with the -// original from the shared state registry (created from a previous checkpoint) -return Optional.of( -new PlaceholderStreamStateHandle(confirmedSstFiles.get(stateHandleID))); Review Comment: Hi @fredia , in the previous discussion, we mentioned that the registry key based on the remote file name (physical id) is used as the key of privateState. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zoltar9264 commented on a diff in pull request #22669: [FLINK-29913] fix shared state be discarded by mistake when maxConcurrentCheckpoint>1
zoltar9264 commented on code in PR #22669: URL: https://github.com/apache/flink/pull/22669#discussion_r1214124394 ## flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java: ## @@ -395,18 +394,16 @@ public void release() { /** Previous snapshot with uploaded sst files. */ protected static class PreviousSnapshot { -@Nullable private final Map confirmedSstFiles; +@Nullable private final Map confirmedSstFiles; -protected PreviousSnapshot(@Nullable Map confirmedSstFiles) { +protected PreviousSnapshot( +@Nullable Map confirmedSstFiles) { this.confirmedSstFiles = confirmedSstFiles; } protected Optional getUploaded(StateHandleID stateHandleID) { if (confirmedSstFiles != null && confirmedSstFiles.containsKey(stateHandleID)) { -// we introduce a placeholder state handle, that is replaced with the -// original from the shared state registry (created from a previous checkpoint) -return Optional.of( -new PlaceholderStreamStateHandle(confirmedSstFiles.get(stateHandleID))); Review Comment: > maybe using `Path` or `File` instead of `String` for `localPath`. I think localPath is just a file name, `Path` is a interface and not implement `Serializable`, `File` does implement `Serializable`, but it will also become a path string when it is finally serialized into the _metadata file. So I think `String` is clearer and enough ? About 'only use PlaceholderStreamStateHandle while the origin handle is ByteStreamStateHandle': In fact I want to suggest developers to do this in the doc of `PlaceholderStreamStateHandle` and `SharedStateRegistry`. Perhaps some bugs could have been avoided if developers followed this advice and only performed replacements on `PlaceholderStreamStateHandle` . I'm not sure if this is over-designed, if so please correct me. :heart: -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zoltar9264 commented on a diff in pull request #22669: [FLINK-29913] fix shared state be discarded by mistake when maxConcurrentCheckpoint>1
zoltar9264 commented on code in PR #22669: URL: https://github.com/apache/flink/pull/22669#discussion_r1213882810 ## flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java: ## @@ -395,18 +394,16 @@ public void release() { /** Previous snapshot with uploaded sst files. */ protected static class PreviousSnapshot { -@Nullable private final Map confirmedSstFiles; +@Nullable private final Map confirmedSstFiles; -protected PreviousSnapshot(@Nullable Map confirmedSstFiles) { +protected PreviousSnapshot( +@Nullable Map confirmedSstFiles) { this.confirmedSstFiles = confirmedSstFiles; } protected Optional getUploaded(StateHandleID stateHandleID) { if (confirmedSstFiles != null && confirmedSstFiles.containsKey(stateHandleID)) { -// we introduce a placeholder state handle, that is replaced with the -// original from the shared state registry (created from a previous checkpoint) -return Optional.of( -new PlaceholderStreamStateHandle(confirmedSstFiles.get(stateHandleID))); Review Comment: Thanks for your patience @rkhachatryan . > But I still recommend limiting its usage to ByteStreamStateHandle, and don't use it to calculate checkpointed size. Before this pr, RocksDBSnapshotStrategyBase use PlaceholderSteamStateHandle no matter the origin handle is ByteStreamStateHandle or FileStateHandle. I suggest only use PlaceholderStreamStateHandle while the origin handle is ByteStreamStateHandle. This pr already implemented not use PlaceholderStreamStateHandle calculate checkpointed size, I want keep it. > That might not be necessary, if the registration key is the same key as in IncrementalRemoteKeyedStateHandle.sharedState, as per https://github.com/apache/flink/pull/22669#discussion_r1210450731. Do you mean change IncrementalRemoteKeyedStateHandle like this ? ``` /** Shared state in the incremental checkpoint. */ private final Map sharedState; /** Private state in the incremental checkpoint. */ private final List privateState; private static final class HandleAndLocalPath { StreamStateHandle handle; String localPath; } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zoltar9264 commented on a diff in pull request #22669: [FLINK-29913] fix shared state be discarded by mistake when maxConcurrentCheckpoint>1
zoltar9264 commented on code in PR #22669: URL: https://github.com/apache/flink/pull/22669#discussion_r1213882810 ## flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java: ## @@ -395,18 +394,16 @@ public void release() { /** Previous snapshot with uploaded sst files. */ protected static class PreviousSnapshot { -@Nullable private final Map confirmedSstFiles; +@Nullable private final Map confirmedSstFiles; -protected PreviousSnapshot(@Nullable Map confirmedSstFiles) { +protected PreviousSnapshot( +@Nullable Map confirmedSstFiles) { this.confirmedSstFiles = confirmedSstFiles; } protected Optional getUploaded(StateHandleID stateHandleID) { if (confirmedSstFiles != null && confirmedSstFiles.containsKey(stateHandleID)) { -// we introduce a placeholder state handle, that is replaced with the -// original from the shared state registry (created from a previous checkpoint) -return Optional.of( -new PlaceholderStreamStateHandle(confirmedSstFiles.get(stateHandleID))); Review Comment: Thanks for your patience @rkhachatryan . > But I still recommend limiting its usage to ByteStreamStateHandle, and don't use it to calculate checkpointed size. Before this pr, RocksDBSnapshotStrategyBase use PlaceholderSteamStateHandle no matter the origin handle is ByteStreamStateHandle or FileStateHandle. I suggest only use PlaceholderStreamStateHandle while the origin handle is ByteStreamStateHandle. This pr already implemented not use PlaceholderStreamStateHandle calculate checkpointed size, I want keep it. > That might not be necessary, if the registration key is the same key as in IncrementalRemoteKeyedStateHandle.sharedState, as per https://github.com/apache/flink/pull/22669#discussion_r1210450731. Do you mean change IncrementalRemoteKeyedStateHandle like this ? ``` /** Shared state in the incremental checkpoint. */ private final Map sharedState; /** Private state in the incremental checkpoint. */ private final List privateState; private static final class HandleWithLocalPath { StreamStateHandle handle; String localPath; } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zoltar9264 commented on a diff in pull request #22669: [FLINK-29913] fix shared state be discarded by mistake when maxConcurrentCheckpoint>1
zoltar9264 commented on code in PR #22669: URL: https://github.com/apache/flink/pull/22669#discussion_r1212512978 ## flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandle.java: ## @@ -324,7 +325,7 @@ public void registerSharedStates(SharedStateRegistry stateRegistry, long checkpo for (Map.Entry sharedStateHandle : sharedState.entrySet()) { SharedStateRegistryKey registryKey = - createSharedStateRegistryKeyFromFileName(sharedStateHandle.getKey()); +createSharedStateRegistryKey(sharedStateHandle.getValue()); Review Comment: I thought it was just a structural improvement before, but now I realize that it was really ill-conceived . We shouldn't be "Pollution first, governance later". Thanks for the correction @rkhachatryan . -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zoltar9264 commented on a diff in pull request #22669: [FLINK-29913] fix shared state be discarded by mistake when maxConcurrentCheckpoint>1
zoltar9264 commented on code in PR #22669: URL: https://github.com/apache/flink/pull/22669#discussion_r1212507940 ## flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java: ## @@ -395,18 +394,16 @@ public void release() { /** Previous snapshot with uploaded sst files. */ protected static class PreviousSnapshot { -@Nullable private final Map confirmedSstFiles; +@Nullable private final Map confirmedSstFiles; -protected PreviousSnapshot(@Nullable Map confirmedSstFiles) { +protected PreviousSnapshot( +@Nullable Map confirmedSstFiles) { this.confirmedSstFiles = confirmedSstFiles; } protected Optional getUploaded(StateHandleID stateHandleID) { if (confirmedSstFiles != null && confirmedSstFiles.containsKey(stateHandleID)) { -// we introduce a placeholder state handle, that is replaced with the -// original from the shared state registry (created from a previous checkpoint) -return Optional.of( -new PlaceholderStreamStateHandle(confirmedSstFiles.get(stateHandleID))); Review Comment: I really don't have a full test to prove that removing PlaceholderStreamStateHandle has no regression, I therefore agree to keep it. But I still recommend limiting its usage to ByteStreamStateHandle, and don't use it to calculate checkpointed size. And I suggest saving the PhysicalStateHandleID of the original StateHandle into the PlaceholderStreamStateHandle to simplify the registration. And we can check it before replacing. WDYT @rkhachatryan ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zoltar9264 commented on a diff in pull request #22669: [FLINK-29913] fix shared state be discarded by mistake when maxConcurrentCheckpoint>1
zoltar9264 commented on code in PR #22669: URL: https://github.com/apache/flink/pull/22669#discussion_r1211171195 ## flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandle.java: ## @@ -361,9 +362,10 @@ IncrementalRemoteKeyedStateHandle copy() { /** Create a unique key to register one of our shared state handles. */ @VisibleForTesting -public SharedStateRegistryKey createSharedStateRegistryKeyFromFileName(StateHandleID shId) { +public SharedStateRegistryKey createSharedStateRegistryKey(StreamStateHandle handle) { +String keyString = handle.getStreamStateHandleID().getKeyString(); return new SharedStateRegistryKey( -String.valueOf(backendIdentifier) + '-' + keyGroupRange, shId); + UUID.nameUUIDFromBytes(keyString.getBytes(StandardCharsets.UTF_8)).toString()); Review Comment: Do you mean like in ChangelogStateHandleStreamImpl ? Since the basic unit of deletion is the StreamStateHandle, I instead think it's correct to use a key based on StreamStateHandle#getStreamStateHandleID() . If SharedStateRegisterKey is based on pairs, files may be deleted while they are still in use. And the changes here do not affect ChangelogStateHandleStreamImpl, whose registration logic is independent. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zoltar9264 commented on a diff in pull request #22669: [FLINK-29913] fix shared state be discarded by mistake when maxConcurrentCheckpoint>1
zoltar9264 commented on code in PR #22669: URL: https://github.com/apache/flink/pull/22669#discussion_r1211153673 ## flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandleTest.java: ## @@ -243,6 +244,60 @@ public void testNonEmptyIntersection() { assertEquals(handle.getStateHandleId(), newHandle.getStateHandleId()); } +@Test +public void testConcurrentCheckpointSharedStateRegistration() throws Exception { +StateHandleID handleID = new StateHandleID("1.sst"); +StreamStateHandle streamHandle1 = new ByteStreamStateHandle("file-1", new byte[] {'s'}); +StreamStateHandle streamHandle2 = new ByteStreamStateHandle("file-2", new byte[] {'s'}); + +SharedStateRegistry registry = new SharedStateRegistryImpl(); + +UUID backendID = UUID.randomUUID(); + +IncrementalRemoteKeyedStateHandle handle1 = +new IncrementalRemoteKeyedStateHandle( +backendID, +KeyGroupRange.of(0, 0), +1L, +placeSpies( +new HashMap() { +{ +put(handleID, streamHandle1); +} +}), Review Comment: Collections.singletonMap return a immutable map, but we need replace the value by IncrementalRemoteKeyedStateHandleTest#placeSpies . -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zoltar9264 commented on a diff in pull request #22669: [FLINK-29913] fix shared state be discarded by mistake when maxConcurrentCheckpoint>1
zoltar9264 commented on code in PR #22669: URL: https://github.com/apache/flink/pull/22669#discussion_r1211126751 ## flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandle.java: ## @@ -324,7 +325,7 @@ public void registerSharedStates(SharedStateRegistry stateRegistry, long checkpo for (Map.Entry sharedStateHandle : sharedState.entrySet()) { SharedStateRegistryKey registryKey = - createSharedStateRegistryKeyFromFileName(sharedStateHandle.getKey()); +createSharedStateRegistryKey(sharedStateHandle.getValue()); Review Comment: I agree, but that need modify the checkpoint metadata serializer, what do you think about opening a new ticket to do this? If possible, I will start as soon as this ticket is completed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zoltar9264 commented on a diff in pull request #22669: [FLINK-29913] fix shared state be discarded by mistake when maxConcurrentCheckpoint>1
zoltar9264 commented on code in PR #22669: URL: https://github.com/apache/flink/pull/22669#discussion_r1211024303 ## flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java: ## @@ -395,18 +394,16 @@ public void release() { /** Previous snapshot with uploaded sst files. */ protected static class PreviousSnapshot { -@Nullable private final Map confirmedSstFiles; +@Nullable private final Map confirmedSstFiles; -protected PreviousSnapshot(@Nullable Map confirmedSstFiles) { +protected PreviousSnapshot( +@Nullable Map confirmedSstFiles) { this.confirmedSstFiles = confirmedSstFiles; } protected Optional getUploaded(StateHandleID stateHandleID) { if (confirmedSstFiles != null && confirmedSstFiles.containsKey(stateHandleID)) { -// we introduce a placeholder state handle, that is replaced with the -// original from the shared state registry (created from a previous checkpoint) -return Optional.of( -new PlaceholderStreamStateHandle(confirmedSstFiles.get(stateHandleID))); Review Comment: ByteStreamStateHandle tends to occupy little memory (no more than 20KB by default), and these memory occupations only occur during network transmission. When repeated ByteStreamStateHandle is passed to SharedStateRegistry, it will be discarded. And, I think we should try to avoid the generation of small files (maybe through file aggregation, etc.). Using PlaceholderStreamStateHandle makes it difficult to implement SharedStateRegisterKey based on physical id, although it can be achieved by storing physical id of original StateHandle into PlaceholderStreamStateHandle. But I still think removing the PlaceholderStreamStateHandle would make the SharedStateRegistry much simpler and clearer. Since I haven't observed bottlenecks in ByteStreamStateHandle delivery in job with 2.6TB of state, I think the rpc overhead of removing PlaceholderStreamStateHandle is acceptable. Is the above explanation acceptable to you @rkhachatryan ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org