[GitHub] [flink] zoltar9264 commented on a diff in pull request #22669: [FLINK-29913] fix shared state be discarded by mistake when maxConcurrentCheckpoint>1

2023-06-30 Thread via GitHub


zoltar9264 commented on code in PR #22669:
URL: https://github.com/apache/flink/pull/22669#discussion_r1247564430


##
flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java:
##
@@ -122,37 +121,20 @@ public StreamStateHandle registerReference(
 LOG.trace(
 "Duplicated registration under key {} with a 
placeholder (normal case)",
 registrationKey);
-scheduledStateDeletion = newHandle;
-} else if (entry.confirmed) {
-LOG.info(
-"Duplicated registration under key {} of a new state: 
{}. "
-+ "This might happen if checkpoint 
confirmation was delayed and state backend re-uploaded the state. "
-+ "Discarding the new state and keeping the 
old one which is included into a completed checkpoint",
-registrationKey,
-newHandle);
-scheduledStateDeletion = newHandle;
 } else {
-// Old entry is not in a confirmed checkpoint yet, and the new 
one differs.
-// This might result from (omitted KG range here for 
simplicity):
-// 1. Flink recovers from a failure using a checkpoint 1
-// 2. State Backend is initialized to UID xyz and a set of 
SST: { 01.sst }
-// 3. JM triggers checkpoint 2
-// 4. TM sends handle: "xyz-002.sst"; JM registers it under 
"xyz-002.sst"
-// 5. TM crashes; everything is repeated from (2)
-// 6. TM recovers from CP 1 again: backend UID "xyz", SST { 
01.sst }
-// 7. JM triggers checkpoint 3
-// 8. TM sends NEW state "xyz-002.sst"
-// 9. JM discards it as duplicate
-// 10. checkpoint completes, but a wrong SST file is used
-// So we use a new entry and discard the old one:
+// might be a bug expect the StreamStateHandleWrapper used by
+// ChangelogStateBackendHandleImpl
 LOG.info(
-"Duplicated registration under key {} of a new state: 
{}. "
-+ "This might happen during the task failover 
if state backend creates different states with the same key before and after 
the failure. "
-+ "Discarding the OLD state and keeping the 
NEW one which is included into a completed checkpoint",
+"the registered handle should equal to the previous 
one or is a placeholder, register key:{}, handle:{}",
 registrationKey,
 newHandle);
-scheduledStateDeletion = entry.stateHandle;
-entry.stateHandle = newHandle;
+if (entry.stateHandle instanceof 
EmptyDiscardStateObjectForRegister) {

Review Comment:
   Test passed after rebase latest master branch :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zoltar9264 commented on a diff in pull request #22669: [FLINK-29913] fix shared state be discarded by mistake when maxConcurrentCheckpoint>1

2023-06-28 Thread via GitHub


zoltar9264 commented on code in PR #22669:
URL: https://github.com/apache/flink/pull/22669#discussion_r1246058692


##
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksNativeFullSnapshotStrategy.java:
##
@@ -219,34 +219,40 @@ public SnapshotResult 
get(CloseableRegistry snapshotCloseableR
 }
 }
 
-private void uploadSstFiles(
-@Nonnull Map privateFiles,
+/** upload files and return total uploaded size. */
+private long uploadSnapshotFiles(
+@Nonnull List privateFiles,

Review Comment:
   Caller is more complex, maybe it's better to keep the status quo?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zoltar9264 commented on a diff in pull request #22669: [FLINK-29913] fix shared state be discarded by mistake when maxConcurrentCheckpoint>1

2023-06-28 Thread via GitHub


zoltar9264 commented on code in PR #22669:
URL: https://github.com/apache/flink/pull/22669#discussion_r1246048717


##
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java:
##
@@ -390,23 +393,36 @@ public void release() {
 }
 
 protected static final PreviousSnapshot EMPTY_PREVIOUS_SNAPSHOT =
-new PreviousSnapshot(Collections.emptyMap());
+new PreviousSnapshot(Collections.emptyList());
 
 /** Previous snapshot with uploaded sst files. */
 protected static class PreviousSnapshot {
 
-@Nullable private final Map confirmedSstFiles;
-
-protected PreviousSnapshot(@Nullable Map 
confirmedSstFiles) {
-this.confirmedSstFiles = confirmedSstFiles;
+@Nonnull private final Map 
confirmedSstFiles;

Review Comment:
   Sorry I didn't understand where can be simplified by this ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zoltar9264 commented on a diff in pull request #22669: [FLINK-29913] fix shared state be discarded by mistake when maxConcurrentCheckpoint>1

2023-06-28 Thread via GitHub


zoltar9264 commented on code in PR #22669:
URL: https://github.com/apache/flink/pull/22669#discussion_r1246040024


##
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java:
##
@@ -334,53 +325,61 @@ public SnapshotResult 
get(CloseableRegistry snapshotCloseableR
 }
 }
 
-private void uploadSstFiles(
-@Nonnull Map sstFiles,
-@Nonnull Map miscFiles,
+/** upload files and return total uploaded size. */
+private long uploadSnapshotFiles(
+@Nonnull List sstFiles,
+@Nonnull List miscFiles,
 @Nonnull CloseableRegistry snapshotCloseableRegistry,
 @Nonnull CloseableRegistry tmpResourcesRegistry)
 throws Exception {
 
 // write state data
 Preconditions.checkState(localBackupDirectory.exists());
 
-Map sstFilePaths = new HashMap<>();
-Map miscFilePaths = new HashMap<>();
-
 Path[] files = localBackupDirectory.listDirectory();
+long uploadedSize = 0;
 if (files != null) {
+List sstFilePaths = new ArrayList<>(files.length);
+List miscFilePaths = new ArrayList<>(files.length);
+
 createUploadFilePaths(files, sstFiles, sstFilePaths, 
miscFilePaths);
 
 final CheckpointedStateScope stateScope =
 sharingFilesStrategy == 
SnapshotType.SharingFilesStrategy.NO_SHARING
 ? CheckpointedStateScope.EXCLUSIVE
 : CheckpointedStateScope.SHARED;
-sstFiles.putAll(
+
+List uploadedSstFiles =
 stateUploader.uploadFilesToCheckpointFs(
 sstFilePaths,
 checkpointStreamFactory,
 stateScope,
 snapshotCloseableRegistry,
-tmpResourcesRegistry));
-miscFiles.putAll(
+tmpResourcesRegistry);
+uploadedSize +=
+uploadedSstFiles.stream()
+.mapToLong(e -> e.getHandle().getStateSize())
+.sum();
+sstFiles.addAll(uploadedSstFiles);
+
+List uploadedMiscFiles =
 stateUploader.uploadFilesToCheckpointFs(
 miscFilePaths,
 checkpointStreamFactory,
 stateScope,
 snapshotCloseableRegistry,
-tmpResourcesRegistry));
-
-synchronized (uploadedStateIDs) {
+tmpResourcesRegistry);
+uploadedSize +=
+uploadedMiscFiles.stream()
+.mapToLong(e -> e.getHandle().getStateSize())
+.sum();
+miscFiles.addAll(uploadedMiscFiles);
+
+synchronized (uploadedSstFilesMap) {
 switch (sharingFilesStrategy) {
 case FORWARD_BACKWARD:
 case FORWARD:
-uploadedStateIDs.put(
-checkpointId,
-sstFiles.entrySet().stream()
-.collect(
-Collectors.toMap(
-Map.Entry::getKey,
-t -> 
t.getValue().getStateSize(;
+uploadedSstFilesMap.put(checkpointId, sstFiles);

Review Comment:
   I had the same concern before, so I checked the context and found no code 
that would modify `sstFiles`, so I didn't add this protection. But now since 
you have the same concern, and more importantly to prevent a future change from 
causing the problem, I think it makes sense to use a unmodifiable collection. 
Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zoltar9264 commented on a diff in pull request #22669: [FLINK-29913] fix shared state be discarded by mistake when maxConcurrentCheckpoint>1

2023-06-28 Thread via GitHub


zoltar9264 commented on code in PR #22669:
URL: https://github.com/apache/flink/pull/22669#discussion_r1246040024


##
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java:
##
@@ -334,53 +325,61 @@ public SnapshotResult 
get(CloseableRegistry snapshotCloseableR
 }
 }
 
-private void uploadSstFiles(
-@Nonnull Map sstFiles,
-@Nonnull Map miscFiles,
+/** upload files and return total uploaded size. */
+private long uploadSnapshotFiles(
+@Nonnull List sstFiles,
+@Nonnull List miscFiles,
 @Nonnull CloseableRegistry snapshotCloseableRegistry,
 @Nonnull CloseableRegistry tmpResourcesRegistry)
 throws Exception {
 
 // write state data
 Preconditions.checkState(localBackupDirectory.exists());
 
-Map sstFilePaths = new HashMap<>();
-Map miscFilePaths = new HashMap<>();
-
 Path[] files = localBackupDirectory.listDirectory();
+long uploadedSize = 0;
 if (files != null) {
+List sstFilePaths = new ArrayList<>(files.length);
+List miscFilePaths = new ArrayList<>(files.length);
+
 createUploadFilePaths(files, sstFiles, sstFilePaths, 
miscFilePaths);
 
 final CheckpointedStateScope stateScope =
 sharingFilesStrategy == 
SnapshotType.SharingFilesStrategy.NO_SHARING
 ? CheckpointedStateScope.EXCLUSIVE
 : CheckpointedStateScope.SHARED;
-sstFiles.putAll(
+
+List uploadedSstFiles =
 stateUploader.uploadFilesToCheckpointFs(
 sstFilePaths,
 checkpointStreamFactory,
 stateScope,
 snapshotCloseableRegistry,
-tmpResourcesRegistry));
-miscFiles.putAll(
+tmpResourcesRegistry);
+uploadedSize +=
+uploadedSstFiles.stream()
+.mapToLong(e -> e.getHandle().getStateSize())
+.sum();
+sstFiles.addAll(uploadedSstFiles);
+
+List uploadedMiscFiles =
 stateUploader.uploadFilesToCheckpointFs(
 miscFilePaths,
 checkpointStreamFactory,
 stateScope,
 snapshotCloseableRegistry,
-tmpResourcesRegistry));
-
-synchronized (uploadedStateIDs) {
+tmpResourcesRegistry);
+uploadedSize +=
+uploadedMiscFiles.stream()
+.mapToLong(e -> e.getHandle().getStateSize())
+.sum();
+miscFiles.addAll(uploadedMiscFiles);
+
+synchronized (uploadedSstFilesMap) {
 switch (sharingFilesStrategy) {
 case FORWARD_BACKWARD:
 case FORWARD:
-uploadedStateIDs.put(
-checkpointId,
-sstFiles.entrySet().stream()
-.collect(
-Collectors.toMap(
-Map.Entry::getKey,
-t -> 
t.getValue().getStateSize(;
+uploadedSstFilesMap.put(checkpointId, sstFiles);

Review Comment:
   I had the same concern before, so I checked the context and found no code 
that would modify `sstFiles`, so I didn't add this protection. But now since 
you have the same concern, and more importantly to prevent a future change from 
causing the problem, I think it makes sense to add protection. Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zoltar9264 commented on a diff in pull request #22669: [FLINK-29913] fix shared state be discarded by mistake when maxConcurrentCheckpoint>1

2023-06-28 Thread via GitHub


zoltar9264 commented on code in PR #22669:
URL: https://github.com/apache/flink/pull/22669#discussion_r1246033453


##
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java:
##
@@ -334,53 +325,61 @@ public SnapshotResult 
get(CloseableRegistry snapshotCloseableR
 }
 }
 
-private void uploadSstFiles(
-@Nonnull Map sstFiles,
-@Nonnull Map miscFiles,
+/** upload files and return total uploaded size. */
+private long uploadSnapshotFiles(
+@Nonnull List sstFiles,
+@Nonnull List miscFiles,
 @Nonnull CloseableRegistry snapshotCloseableRegistry,
 @Nonnull CloseableRegistry tmpResourcesRegistry)
 throws Exception {
 
 // write state data
 Preconditions.checkState(localBackupDirectory.exists());
 
-Map sstFilePaths = new HashMap<>();
-Map miscFilePaths = new HashMap<>();
-
 Path[] files = localBackupDirectory.listDirectory();
+long uploadedSize = 0;
 if (files != null) {
+List sstFilePaths = new ArrayList<>(files.length);
+List miscFilePaths = new ArrayList<>(files.length);
+
 createUploadFilePaths(files, sstFiles, sstFilePaths, 
miscFilePaths);
 
 final CheckpointedStateScope stateScope =
 sharingFilesStrategy == 
SnapshotType.SharingFilesStrategy.NO_SHARING
 ? CheckpointedStateScope.EXCLUSIVE
 : CheckpointedStateScope.SHARED;
-sstFiles.putAll(
+
+List uploadedSstFiles =
 stateUploader.uploadFilesToCheckpointFs(
 sstFilePaths,
 checkpointStreamFactory,
 stateScope,
 snapshotCloseableRegistry,
-tmpResourcesRegistry));
-miscFiles.putAll(
+tmpResourcesRegistry);
+uploadedSize +=
+uploadedSstFiles.stream()
+.mapToLong(e -> e.getHandle().getStateSize())

Review Comment:
   Good suggestion, done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zoltar9264 commented on a diff in pull request #22669: [FLINK-29913] fix shared state be discarded by mistake when maxConcurrentCheckpoint>1

2023-06-28 Thread via GitHub


zoltar9264 commented on code in PR #22669:
URL: https://github.com/apache/flink/pull/22669#discussion_r1246032593


##
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java:
##
@@ -390,23 +393,36 @@ public void release() {
 }
 
 protected static final PreviousSnapshot EMPTY_PREVIOUS_SNAPSHOT =
-new PreviousSnapshot(Collections.emptyMap());
+new PreviousSnapshot(Collections.emptyList());
 
 /** Previous snapshot with uploaded sst files. */
 protected static class PreviousSnapshot {
 
-@Nullable private final Map confirmedSstFiles;
-
-protected PreviousSnapshot(@Nullable Map 
confirmedSstFiles) {
-this.confirmedSstFiles = confirmedSstFiles;
+@Nonnull private final Map 
confirmedSstFiles;
+
+protected PreviousSnapshot(@Nullable Collection 
confirmedSstFiles) {
+this.confirmedSstFiles =
+confirmedSstFiles != null
+? confirmedSstFiles.stream()
+.collect(
+Collectors.toMap(
+
HandleAndLocalPath::getLocalPath,
+
HandleAndLocalPath::getHandle))
+: Collections.emptyMap();
 }
 
-protected Optional getUploaded(StateHandleID 
stateHandleID) {
-if (confirmedSstFiles != null && 
confirmedSstFiles.containsKey(stateHandleID)) {
-// we introduce a placeholder state handle, that is replaced 
with the
-// original from the shared state registry (created from a 
previous checkpoint)
-return Optional.of(
-new 
PlaceholderStreamStateHandle(confirmedSstFiles.get(stateHandleID)));
+protected Optional getUploaded(String filename) {
+if (confirmedSstFiles.containsKey(filename)) {
+StreamStateHandle handle = confirmedSstFiles.get(filename);
+if (handle instanceof ByteStreamStateHandle) {

Review Comment:
   I see, fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zoltar9264 commented on a diff in pull request #22669: [FLINK-29913] fix shared state be discarded by mistake when maxConcurrentCheckpoint>1

2023-06-28 Thread via GitHub


zoltar9264 commented on code in PR #22669:
URL: https://github.com/apache/flink/pull/22669#discussion_r1246031733


##
flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java:
##
@@ -32,9 +36,62 @@ public interface IncrementalKeyedStateHandle
 UUID getBackendIdentifier();
 
 /**
- * Returns a set of ids of all registered shared states in the backend at 
the time this was
- * created.
+ * Returns a list of all shared states and the corresponding localPath in 
the backend at the
+ * time this was created.
  */
 @Nonnull
-Map getSharedStateHandles();
+List getSharedStateHandles();
+
+/** A Holder of StreamStateHandle and the corresponding localPath. */
+final class HandleAndLocalPath implements Serializable {
+
+private static final long serialVersionUID = 7711754687567545052L;
+
+StreamStateHandle handle;
+final String localPath;

Review Comment:
   Yes, thanks for careful review ! fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zoltar9264 commented on a diff in pull request #22669: [FLINK-29913] fix shared state be discarded by mistake when maxConcurrentCheckpoint>1

2023-06-28 Thread via GitHub


zoltar9264 commented on code in PR #22669:
URL: https://github.com/apache/flink/pull/22669#discussion_r1246030055


##
flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java:
##
@@ -122,37 +121,20 @@ public StreamStateHandle registerReference(
 LOG.trace(
 "Duplicated registration under key {} with a 
placeholder (normal case)",
 registrationKey);
-scheduledStateDeletion = newHandle;
-} else if (entry.confirmed) {
-LOG.info(
-"Duplicated registration under key {} of a new state: 
{}. "
-+ "This might happen if checkpoint 
confirmation was delayed and state backend re-uploaded the state. "
-+ "Discarding the new state and keeping the 
old one which is included into a completed checkpoint",
-registrationKey,
-newHandle);
-scheduledStateDeletion = newHandle;
 } else {
-// Old entry is not in a confirmed checkpoint yet, and the new 
one differs.
-// This might result from (omitted KG range here for 
simplicity):
-// 1. Flink recovers from a failure using a checkpoint 1
-// 2. State Backend is initialized to UID xyz and a set of 
SST: { 01.sst }
-// 3. JM triggers checkpoint 2
-// 4. TM sends handle: "xyz-002.sst"; JM registers it under 
"xyz-002.sst"
-// 5. TM crashes; everything is repeated from (2)
-// 6. TM recovers from CP 1 again: backend UID "xyz", SST { 
01.sst }
-// 7. JM triggers checkpoint 3
-// 8. TM sends NEW state "xyz-002.sst"
-// 9. JM discards it as duplicate
-// 10. checkpoint completes, but a wrong SST file is used
-// So we use a new entry and discard the old one:
+// might be a bug expect the StreamStateHandleWrapper used by
+// ChangelogStateBackendHandleImpl
 LOG.info(
-"Duplicated registration under key {} of a new state: 
{}. "
-+ "This might happen during the task failover 
if state backend creates different states with the same key before and after 
the failure. "
-+ "Discarding the OLD state and keeping the 
NEW one which is included into a completed checkpoint",
+"the registered handle should equal to the previous 
one or is a placeholder, register key:{}, handle:{}",
 registrationKey,
 newHandle);
-scheduledStateDeletion = entry.stateHandle;
-entry.stateHandle = newHandle;
+if (entry.stateHandle instanceof 
EmptyDiscardStateObjectForRegister) {

Review Comment:
   You are right, I missed the 2nd case before. I've thrown an exception and 
tried to fix the test which then fails. But the [test ci core still 
fails](https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=50565=logs=66645748-20ed-5f80-dbdf-bb5906c15462=e32bdfab-58bb-53ea-d411-d67a54d2939f)
 and I don't know why, can you help me take a look @rkhachatryan ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zoltar9264 commented on a diff in pull request #22669: [FLINK-29913] fix shared state be discarded by mistake when maxConcurrentCheckpoint>1

2023-06-28 Thread via GitHub


zoltar9264 commented on code in PR #22669:
URL: https://github.com/apache/flink/pull/22669#discussion_r1246030055


##
flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java:
##
@@ -122,37 +121,20 @@ public StreamStateHandle registerReference(
 LOG.trace(
 "Duplicated registration under key {} with a 
placeholder (normal case)",
 registrationKey);
-scheduledStateDeletion = newHandle;
-} else if (entry.confirmed) {
-LOG.info(
-"Duplicated registration under key {} of a new state: 
{}. "
-+ "This might happen if checkpoint 
confirmation was delayed and state backend re-uploaded the state. "
-+ "Discarding the new state and keeping the 
old one which is included into a completed checkpoint",
-registrationKey,
-newHandle);
-scheduledStateDeletion = newHandle;
 } else {
-// Old entry is not in a confirmed checkpoint yet, and the new 
one differs.
-// This might result from (omitted KG range here for 
simplicity):
-// 1. Flink recovers from a failure using a checkpoint 1
-// 2. State Backend is initialized to UID xyz and a set of 
SST: { 01.sst }
-// 3. JM triggers checkpoint 2
-// 4. TM sends handle: "xyz-002.sst"; JM registers it under 
"xyz-002.sst"
-// 5. TM crashes; everything is repeated from (2)
-// 6. TM recovers from CP 1 again: backend UID "xyz", SST { 
01.sst }
-// 7. JM triggers checkpoint 3
-// 8. TM sends NEW state "xyz-002.sst"
-// 9. JM discards it as duplicate
-// 10. checkpoint completes, but a wrong SST file is used
-// So we use a new entry and discard the old one:
+// might be a bug expect the StreamStateHandleWrapper used by
+// ChangelogStateBackendHandleImpl
 LOG.info(
-"Duplicated registration under key {} of a new state: 
{}. "
-+ "This might happen during the task failover 
if state backend creates different states with the same key before and after 
the failure. "
-+ "Discarding the OLD state and keeping the 
NEW one which is included into a completed checkpoint",
+"the registered handle should equal to the previous 
one or is a placeholder, register key:{}, handle:{}",
 registrationKey,
 newHandle);
-scheduledStateDeletion = entry.stateHandle;
-entry.stateHandle = newHandle;
+if (entry.stateHandle instanceof 
EmptyDiscardStateObjectForRegister) {

Review Comment:
   You are right, I missed the 2nd case before. I've thrown an exception and 
tried to fix the test which then fails. But the [test ci core still 
fails](https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=50565=logs=66645748-20ed-5f80-dbdf-bb5906c15462=e32bdfab-58bb-53ea-d411-d67a54d2939f)
 and I don't know why, can you help me take a look ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zoltar9264 commented on a diff in pull request #22669: [FLINK-29913] fix shared state be discarded by mistake when maxConcurrentCheckpoint>1

2023-06-27 Thread via GitHub


zoltar9264 commented on code in PR #22669:
URL: https://github.com/apache/flink/pull/22669#discussion_r1243650264


##
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java:
##
@@ -758,6 +762,15 @@ private ChangelogSnapshotState completeRestore(
  */
 @Override
 public Optional initMaterialization() throws 
Exception {
+if (materializedId > 0
+&& lastConfirmedMaterializationId < materializedId - 1
+&& lastFailedMaterializationId < materializedId - 1) {
+LOG.info(
+"materialization:{} not confirmed or failed or cancelled, 
skip trigger new one.",
+materializedId - 1);
+return Optional.empty();
+}

Review Comment:
   1. done.
   2. done.
   3. actually, check of `materializaedId > 0` is meaningless, this is my 
mistake, deleted it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zoltar9264 commented on a diff in pull request #22669: [FLINK-29913] fix shared state be discarded by mistake when maxConcurrentCheckpoint>1

2023-06-27 Thread via GitHub


zoltar9264 commented on code in PR #22669:
URL: https://github.com/apache/flink/pull/22669#discussion_r1243246177


##
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java:
##
@@ -728,6 +731,7 @@ private ChangelogSnapshotState completeRestore(
 materializationId = Math.max(materializationId, 
h.getMaterializationID());
 }
 }
+this.lastFailedMaterializationId = materializationId;

Review Comment:
   Sorry, this is my mistake. I originally wanted to set 
`lastConfirmedMaterializationId` to "restore" `materializationId` so that the 
first materialization can be triggered. (Or we can add a field to save the id 
of the first materialization, and check whether it is the first materialization 
when `initMaterialization`.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zoltar9264 commented on a diff in pull request #22669: [FLINK-29913] fix shared state be discarded by mistake when maxConcurrentCheckpoint>1

2023-06-20 Thread via GitHub


zoltar9264 commented on code in PR #22669:
URL: https://github.com/apache/flink/pull/22669#discussion_r1235147823


##
flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandle.java:
##
@@ -359,11 +357,12 @@ IncrementalRemoteKeyedStateHandle copy() {
 stateHandleId);
 }
 
-/** Create a unique key to register one of our shared state handles. */
+/** Create a unique key based on physical id to register one of our shared 
state handles. */
 @VisibleForTesting
-public SharedStateRegistryKey 
createSharedStateRegistryKeyFromFileName(StateHandleID shId) {
+public SharedStateRegistryKey 
createSharedStateRegistryKey(StreamStateHandle handle) {
+String keyString = handle.getStreamStateHandleID().getKeyString();
 return new SharedStateRegistryKey(
-String.valueOf(backendIdentifier) + '-' + keyGroupRange, shId);
+
UUID.nameUUIDFromBytes(keyString.getBytes(StandardCharsets.UTF_8)).toString());

Review Comment:
   Yes, key strings tend to be longer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zoltar9264 commented on a diff in pull request #22669: [FLINK-29913] fix shared state be discarded by mistake when maxConcurrentCheckpoint>1

2023-06-20 Thread via GitHub


zoltar9264 commented on code in PR #22669:
URL: https://github.com/apache/flink/pull/22669#discussion_r1235146158


##
flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java:
##
@@ -32,9 +36,62 @@
 UUID getBackendIdentifier();
 
 /**
- * Returns a set of ids of all registered shared states in the backend at 
the time this was
- * created.
+ * Returns a list of all shared states and the corresponding localPath in 
the backend at the
+ * time this was created.
  */
 @Nonnull
-Map getSharedStateHandles();
+List getSharedStateHandles();
+
+/** A Holder of StreamStateHandle and the corresponding localPath. */
+final class HandleAndLocalPath implements Serializable {
+
+private static final long serialVersionUID = 7711754687567545052L;
+
+StreamStateHandle handle;
+String localPath;

Review Comment:
   You are right, I've changed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zoltar9264 commented on a diff in pull request #22669: [FLINK-29913] fix shared state be discarded by mistake when maxConcurrentCheckpoint>1

2023-06-20 Thread via GitHub


zoltar9264 commented on code in PR #22669:
URL: https://github.com/apache/flink/pull/22669#discussion_r1235145121


##
flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java:
##
@@ -32,9 +36,62 @@
 UUID getBackendIdentifier();
 
 /**
- * Returns a set of ids of all registered shared states in the backend at 
the time this was
- * created.
+ * Returns a list of all shared states and the corresponding localPath in 
the backend at the
+ * time this was created.
  */
 @Nonnull
-Map getSharedStateHandles();
+List getSharedStateHandles();
+
+/** A Holder of StreamStateHandle and the corresponding localPath. */
+final class HandleAndLocalPath implements Serializable {

Review Comment:
   Thank you for helping to review this pr @StefanRRichter !
   `IncrementalKeyedStateHandle` is interface, so the keyword `static` is not 
necessary.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zoltar9264 commented on a diff in pull request #22669: [FLINK-29913] fix shared state be discarded by mistake when maxConcurrentCheckpoint>1

2023-06-20 Thread via GitHub


zoltar9264 commented on code in PR #22669:
URL: https://github.com/apache/flink/pull/22669#discussion_r1235016273


##
flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java:
##
@@ -122,37 +121,20 @@ public StreamStateHandle registerReference(
 LOG.trace(
 "Duplicated registration under key {} with a 
placeholder (normal case)",
 registrationKey);
-scheduledStateDeletion = newHandle;
-} else if (entry.confirmed) {
-LOG.info(
-"Duplicated registration under key {} of a new state: 
{}. "
-+ "This might happen if checkpoint 
confirmation was delayed and state backend re-uploaded the state. "
-+ "Discarding the new state and keeping the 
old one which is included into a completed checkpoint",
-registrationKey,
-newHandle);
-scheduledStateDeletion = newHandle;
 } else {
-// Old entry is not in a confirmed checkpoint yet, and the new 
one differs.
-// This might result from (omitted KG range here for 
simplicity):
-// 1. Flink recovers from a failure using a checkpoint 1
-// 2. State Backend is initialized to UID xyz and a set of 
SST: { 01.sst }
-// 3. JM triggers checkpoint 2
-// 4. TM sends handle: "xyz-002.sst"; JM registers it under 
"xyz-002.sst"
-// 5. TM crashes; everything is repeated from (2)
-// 6. TM recovers from CP 1 again: backend UID "xyz", SST { 
01.sst }
-// 7. JM triggers checkpoint 3
-// 8. TM sends NEW state "xyz-002.sst"
-// 9. JM discards it as duplicate
-// 10. checkpoint completes, but a wrong SST file is used
-// So we use a new entry and discard the old one:
+// might be a bug expect the StreamStateHandleWrapper used by
+// ChangelogStateBackendHandleImpl
 LOG.info(
-"Duplicated registration under key {} of a new state: 
{}. "
-+ "This might happen during the task failover 
if state backend creates different states with the same key before and after 
the failure. "
-+ "Discarding the OLD state and keeping the 
NEW one which is included into a completed checkpoint",
+"the registered handle should equal to the previous 
one or is a placeholder, register key:{}, handle:{}",
 registrationKey,
 newHandle);
-scheduledStateDeletion = entry.stateHandle;
-entry.stateHandle = newHandle;
+if (entry.stateHandle instanceof 
EmptyDiscardStateObjectForRegister) {

Review Comment:
   `ChangelogStateBackendHandleImpl` will use 
`ChangelogStateBackendHandleImpl.StreamStateHandleWrapper` register 
materialized state, so I think we can't throw an exception directly. Maybe we 
can add a log ?
   
   BTW, `ChangelogStateBackendHandleImpl.StreamStateHandleWrapper` can be 
deleted after [FLINK-25862](https://issues.apache.org/jira/browse/FLINK-25862) 
is resolved. I'd be happy to pick up FLINK-25862 after this ticket is resolved.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zoltar9264 commented on a diff in pull request #22669: [FLINK-29913] fix shared state be discarded by mistake when maxConcurrentCheckpoint>1

2023-06-07 Thread via GitHub


zoltar9264 commented on code in PR #22669:
URL: https://github.com/apache/flink/pull/22669#discussion_r1220936492


##
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java:
##
@@ -395,18 +394,16 @@ public void release() {
 /** Previous snapshot with uploaded sst files. */
 protected static class PreviousSnapshot {
 
-@Nullable private final Map confirmedSstFiles;
+@Nullable private final Map 
confirmedSstFiles;
 
-protected PreviousSnapshot(@Nullable Map 
confirmedSstFiles) {
+protected PreviousSnapshot(
+@Nullable Map 
confirmedSstFiles) {
 this.confirmedSstFiles = confirmedSstFiles;
 }
 
 protected Optional getUploaded(StateHandleID 
stateHandleID) {
 if (confirmedSstFiles != null && 
confirmedSstFiles.containsKey(stateHandleID)) {
-// we introduce a placeholder state handle, that is replaced 
with the
-// original from the shared state registry (created from a 
previous checkpoint)
-return Optional.of(
-new 
PlaceholderStreamStateHandle(confirmedSstFiles.get(stateHandleID)));

Review Comment:
   Thanks for your patience in explaining @rkhachatryan , I totally agree with 
your last proposal.
   
   And I realized in coding that maybe we shouldn't store 
SharedStateRegistryKey in IncrementalRemoteKeyedStateHandle. Because 
SharedStateRegistryKey is only used during registration and can be calculated 
from physical id. More importantly, when the remote state file is relocated 
(savepoint is relocatable), the physical id will change accordingly. The 
[aforementioned change of 
IncrementalRemoteKeyedStateHandle](https://github.com/apache/flink/pull/22669#discussion_r1213882810)
 should look like this:
   ```
   /** Shared state in the incremental checkpoint. */
   private final List sharedState;
   
   /** Private state in the incremental checkpoint. */
   private final List privateState;
   
   private static final class HandleAndLocalPath {
   StreamStateHandle handle;
   String localPath;
   }
   ```
   This way, we don't need to introduce a new serialized version for 
IncrementalRemoteKeyedStateHandle in `MetadataV2V3SerializerBase` either. 
Because `HandleAndLocalPath` and `Map.Entry` 
can use exactly the same serialization form.
   
   WDYT  ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zoltar9264 commented on a diff in pull request #22669: [FLINK-29913] fix shared state be discarded by mistake when maxConcurrentCheckpoint>1

2023-06-07 Thread via GitHub


zoltar9264 commented on code in PR #22669:
URL: https://github.com/apache/flink/pull/22669#discussion_r1220936492


##
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java:
##
@@ -395,18 +394,16 @@ public void release() {
 /** Previous snapshot with uploaded sst files. */
 protected static class PreviousSnapshot {
 
-@Nullable private final Map confirmedSstFiles;
+@Nullable private final Map 
confirmedSstFiles;
 
-protected PreviousSnapshot(@Nullable Map 
confirmedSstFiles) {
+protected PreviousSnapshot(
+@Nullable Map 
confirmedSstFiles) {
 this.confirmedSstFiles = confirmedSstFiles;
 }
 
 protected Optional getUploaded(StateHandleID 
stateHandleID) {
 if (confirmedSstFiles != null && 
confirmedSstFiles.containsKey(stateHandleID)) {
-// we introduce a placeholder state handle, that is replaced 
with the
-// original from the shared state registry (created from a 
previous checkpoint)
-return Optional.of(
-new 
PlaceholderStreamStateHandle(confirmedSstFiles.get(stateHandleID)));

Review Comment:
   Thanks for your patience in explaining @rkhachatryan , I totally agree with 
your last proposal.
   
   And I realized in coding that maybe we shouldn't store 
SharedStateRegistryKey in IncrementalRemoteKeyedStateHandle. Because 
SharedStateRegistryKey is only used during registration and can be calculated 
from physical id. More importantly, when the remote state file is relocated 
(savepoint is relocatable), the physical id will change accordingly. The 
[aforementioned change of 
IncrementalRemoteKeyedStateHandle](https://github.com/apache/flink/pull/22669#discussion_r1213882810)
 should look like this:
   ```
   /** Shared state in the incremental checkpoint. */
   private final List sharedState;
   
   /** Private state in the incremental checkpoint. */
   private final List privateState;
   
   private static final class HandleAndLocalPath {
   StreamStateHandle handle;
   String localPath;
   }
   ```
   This way, we don't need to introduce a new serialized version for 
IncrementalRemoteKeyedStateHandle in `MetadataV2V3SerializerBase` either.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zoltar9264 commented on a diff in pull request #22669: [FLINK-29913] fix shared state be discarded by mistake when maxConcurrentCheckpoint>1

2023-06-04 Thread via GitHub


zoltar9264 commented on code in PR #22669:
URL: https://github.com/apache/flink/pull/22669#discussion_r1216375020


##
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java:
##
@@ -395,18 +394,16 @@ public void release() {
 /** Previous snapshot with uploaded sst files. */
 protected static class PreviousSnapshot {
 
-@Nullable private final Map confirmedSstFiles;
+@Nullable private final Map 
confirmedSstFiles;
 
-protected PreviousSnapshot(@Nullable Map 
confirmedSstFiles) {
+protected PreviousSnapshot(
+@Nullable Map 
confirmedSstFiles) {
 this.confirmedSstFiles = confirmedSstFiles;
 }
 
 protected Optional getUploaded(StateHandleID 
stateHandleID) {
 if (confirmedSstFiles != null && 
confirmedSstFiles.containsKey(stateHandleID)) {
-// we introduce a placeholder state handle, that is replaced 
with the
-// original from the shared state registry (created from a 
previous checkpoint)
-return Optional.of(
-new 
PlaceholderStreamStateHandle(confirmedSstFiles.get(stateHandleID)));

Review Comment:
   Hi @fredia , in the previous discussion, we mentioned that the registry key 
based on the remote file name (physical id) is used as the key of privateState.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zoltar9264 commented on a diff in pull request #22669: [FLINK-29913] fix shared state be discarded by mistake when maxConcurrentCheckpoint>1

2023-06-02 Thread via GitHub


zoltar9264 commented on code in PR #22669:
URL: https://github.com/apache/flink/pull/22669#discussion_r1214124394


##
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java:
##
@@ -395,18 +394,16 @@ public void release() {
 /** Previous snapshot with uploaded sst files. */
 protected static class PreviousSnapshot {
 
-@Nullable private final Map confirmedSstFiles;
+@Nullable private final Map 
confirmedSstFiles;
 
-protected PreviousSnapshot(@Nullable Map 
confirmedSstFiles) {
+protected PreviousSnapshot(
+@Nullable Map 
confirmedSstFiles) {
 this.confirmedSstFiles = confirmedSstFiles;
 }
 
 protected Optional getUploaded(StateHandleID 
stateHandleID) {
 if (confirmedSstFiles != null && 
confirmedSstFiles.containsKey(stateHandleID)) {
-// we introduce a placeholder state handle, that is replaced 
with the
-// original from the shared state registry (created from a 
previous checkpoint)
-return Optional.of(
-new 
PlaceholderStreamStateHandle(confirmedSstFiles.get(stateHandleID)));

Review Comment:
   > maybe using `Path` or `File` instead of `String` for `localPath`.
   
   I think localPath is just a file name, `Path` is a interface and not 
implement `Serializable`, `File` does implement `Serializable`, but it will 
also become a path string when it is finally serialized into the _metadata 
file. So I think `String` is clearer and enough ?
   
   About 'only use PlaceholderStreamStateHandle while the origin handle is 
ByteStreamStateHandle':
   In fact I want to suggest developers to do this in the doc of 
`PlaceholderStreamStateHandle` and `SharedStateRegistry`. Perhaps some bugs 
could have been avoided if developers followed this advice and only performed 
replacements on `PlaceholderStreamStateHandle` . I'm not sure if this is 
over-designed, if so please correct me.  :heart:



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zoltar9264 commented on a diff in pull request #22669: [FLINK-29913] fix shared state be discarded by mistake when maxConcurrentCheckpoint>1

2023-06-01 Thread via GitHub


zoltar9264 commented on code in PR #22669:
URL: https://github.com/apache/flink/pull/22669#discussion_r1213882810


##
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java:
##
@@ -395,18 +394,16 @@ public void release() {
 /** Previous snapshot with uploaded sst files. */
 protected static class PreviousSnapshot {
 
-@Nullable private final Map confirmedSstFiles;
+@Nullable private final Map 
confirmedSstFiles;
 
-protected PreviousSnapshot(@Nullable Map 
confirmedSstFiles) {
+protected PreviousSnapshot(
+@Nullable Map 
confirmedSstFiles) {
 this.confirmedSstFiles = confirmedSstFiles;
 }
 
 protected Optional getUploaded(StateHandleID 
stateHandleID) {
 if (confirmedSstFiles != null && 
confirmedSstFiles.containsKey(stateHandleID)) {
-// we introduce a placeholder state handle, that is replaced 
with the
-// original from the shared state registry (created from a 
previous checkpoint)
-return Optional.of(
-new 
PlaceholderStreamStateHandle(confirmedSstFiles.get(stateHandleID)));

Review Comment:
   Thanks for your patience @rkhachatryan .
   
   > But I still recommend limiting its usage to ByteStreamStateHandle, and 
don't use it to calculate checkpointed size.
   
   Before this pr, RocksDBSnapshotStrategyBase use PlaceholderSteamStateHandle 
no matter the origin handle is ByteStreamStateHandle or FileStateHandle. I 
suggest only use PlaceholderStreamStateHandle while the origin handle is 
ByteStreamStateHandle. This pr already implemented not use 
PlaceholderStreamStateHandle calculate checkpointed size, I want keep it.
   
   > That might not be necessary, if the registration key is the same key as in 
IncrementalRemoteKeyedStateHandle.sharedState, as per 
https://github.com/apache/flink/pull/22669#discussion_r1210450731.
   
   Do you mean change IncrementalRemoteKeyedStateHandle like this ?
   ```
   /** Shared state in the incremental checkpoint. */
   private final Map 
sharedState;
   
   /** Private state in the incremental checkpoint. */
   private final List privateState;
   
   private static final class HandleAndLocalPath {
   StreamStateHandle handle;
   String localPath;
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zoltar9264 commented on a diff in pull request #22669: [FLINK-29913] fix shared state be discarded by mistake when maxConcurrentCheckpoint>1

2023-06-01 Thread via GitHub


zoltar9264 commented on code in PR #22669:
URL: https://github.com/apache/flink/pull/22669#discussion_r1213882810


##
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java:
##
@@ -395,18 +394,16 @@ public void release() {
 /** Previous snapshot with uploaded sst files. */
 protected static class PreviousSnapshot {
 
-@Nullable private final Map confirmedSstFiles;
+@Nullable private final Map 
confirmedSstFiles;
 
-protected PreviousSnapshot(@Nullable Map 
confirmedSstFiles) {
+protected PreviousSnapshot(
+@Nullable Map 
confirmedSstFiles) {
 this.confirmedSstFiles = confirmedSstFiles;
 }
 
 protected Optional getUploaded(StateHandleID 
stateHandleID) {
 if (confirmedSstFiles != null && 
confirmedSstFiles.containsKey(stateHandleID)) {
-// we introduce a placeholder state handle, that is replaced 
with the
-// original from the shared state registry (created from a 
previous checkpoint)
-return Optional.of(
-new 
PlaceholderStreamStateHandle(confirmedSstFiles.get(stateHandleID)));

Review Comment:
   Thanks for your patience @rkhachatryan .
   
   > But I still recommend limiting its usage to ByteStreamStateHandle, and 
don't use it to calculate checkpointed size.
   
   Before this pr, RocksDBSnapshotStrategyBase use PlaceholderSteamStateHandle 
no matter the origin handle is ByteStreamStateHandle or FileStateHandle. I 
suggest only use PlaceholderStreamStateHandle while the origin handle is 
ByteStreamStateHandle. This pr already implemented not use 
PlaceholderStreamStateHandle calculate checkpointed size, I want keep it.
   
   > That might not be necessary, if the registration key is the same key as in 
IncrementalRemoteKeyedStateHandle.sharedState, as per 
https://github.com/apache/flink/pull/22669#discussion_r1210450731.
   
   Do you mean change IncrementalRemoteKeyedStateHandle like this ?
   ```
   /** Shared state in the incremental checkpoint. */
   private final Map 
sharedState;
   
   /** Private state in the incremental checkpoint. */
   private final List privateState;
   
   private static final class HandleWithLocalPath {
   StreamStateHandle handle;
   String localPath;
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zoltar9264 commented on a diff in pull request #22669: [FLINK-29913] fix shared state be discarded by mistake when maxConcurrentCheckpoint>1

2023-05-31 Thread via GitHub


zoltar9264 commented on code in PR #22669:
URL: https://github.com/apache/flink/pull/22669#discussion_r1212512978


##
flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandle.java:
##
@@ -324,7 +325,7 @@ public void registerSharedStates(SharedStateRegistry 
stateRegistry, long checkpo
 for (Map.Entry sharedStateHandle :
 sharedState.entrySet()) {
 SharedStateRegistryKey registryKey =
-
createSharedStateRegistryKeyFromFileName(sharedStateHandle.getKey());
+createSharedStateRegistryKey(sharedStateHandle.getValue());

Review Comment:
   I thought it was just a structural improvement before, but now I realize 
that it was really ill-conceived . We shouldn't be "Pollution first, governance 
later". Thanks for the correction @rkhachatryan .



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zoltar9264 commented on a diff in pull request #22669: [FLINK-29913] fix shared state be discarded by mistake when maxConcurrentCheckpoint>1

2023-05-31 Thread via GitHub


zoltar9264 commented on code in PR #22669:
URL: https://github.com/apache/flink/pull/22669#discussion_r1212507940


##
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java:
##
@@ -395,18 +394,16 @@ public void release() {
 /** Previous snapshot with uploaded sst files. */
 protected static class PreviousSnapshot {
 
-@Nullable private final Map confirmedSstFiles;
+@Nullable private final Map 
confirmedSstFiles;
 
-protected PreviousSnapshot(@Nullable Map 
confirmedSstFiles) {
+protected PreviousSnapshot(
+@Nullable Map 
confirmedSstFiles) {
 this.confirmedSstFiles = confirmedSstFiles;
 }
 
 protected Optional getUploaded(StateHandleID 
stateHandleID) {
 if (confirmedSstFiles != null && 
confirmedSstFiles.containsKey(stateHandleID)) {
-// we introduce a placeholder state handle, that is replaced 
with the
-// original from the shared state registry (created from a 
previous checkpoint)
-return Optional.of(
-new 
PlaceholderStreamStateHandle(confirmedSstFiles.get(stateHandleID)));

Review Comment:
   I really don't have a full test to prove that removing 
PlaceholderStreamStateHandle has no regression, I therefore agree to keep it. 
But I still recommend limiting its usage to ByteStreamStateHandle, and don't 
use it to calculate checkpointed size.
   And I suggest saving the PhysicalStateHandleID of the original StateHandle 
into the PlaceholderStreamStateHandle to simplify the registration. And we can 
check it before replacing.
   WDYT @rkhachatryan ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zoltar9264 commented on a diff in pull request #22669: [FLINK-29913] fix shared state be discarded by mistake when maxConcurrentCheckpoint>1

2023-05-31 Thread via GitHub


zoltar9264 commented on code in PR #22669:
URL: https://github.com/apache/flink/pull/22669#discussion_r1211171195


##
flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandle.java:
##
@@ -361,9 +362,10 @@ IncrementalRemoteKeyedStateHandle copy() {
 
 /** Create a unique key to register one of our shared state handles. */
 @VisibleForTesting
-public SharedStateRegistryKey 
createSharedStateRegistryKeyFromFileName(StateHandleID shId) {
+public SharedStateRegistryKey 
createSharedStateRegistryKey(StreamStateHandle handle) {
+String keyString = handle.getStreamStateHandleID().getKeyString();
 return new SharedStateRegistryKey(
-String.valueOf(backendIdentifier) + '-' + keyGroupRange, shId);
+
UUID.nameUUIDFromBytes(keyString.getBytes(StandardCharsets.UTF_8)).toString());

Review Comment:
   Do you mean like in ChangelogStateHandleStreamImpl ? 
   Since the basic unit of deletion is the StreamStateHandle, I instead think 
it's correct to use a key based on StreamStateHandle#getStreamStateHandleID() . 
If SharedStateRegisterKey is based on  pairs, files 
may be deleted while they are still in use.
   And the changes here do not affect ChangelogStateHandleStreamImpl, whose 
registration logic is independent.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zoltar9264 commented on a diff in pull request #22669: [FLINK-29913] fix shared state be discarded by mistake when maxConcurrentCheckpoint>1

2023-05-31 Thread via GitHub


zoltar9264 commented on code in PR #22669:
URL: https://github.com/apache/flink/pull/22669#discussion_r1211153673


##
flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandleTest.java:
##
@@ -243,6 +244,60 @@ public void testNonEmptyIntersection() {
 assertEquals(handle.getStateHandleId(), newHandle.getStateHandleId());
 }
 
+@Test
+public void testConcurrentCheckpointSharedStateRegistration() throws 
Exception {
+StateHandleID handleID = new StateHandleID("1.sst");
+StreamStateHandle streamHandle1 = new ByteStreamStateHandle("file-1", 
new byte[] {'s'});
+StreamStateHandle streamHandle2 = new ByteStreamStateHandle("file-2", 
new byte[] {'s'});
+
+SharedStateRegistry registry = new SharedStateRegistryImpl();
+
+UUID backendID = UUID.randomUUID();
+
+IncrementalRemoteKeyedStateHandle handle1 =
+new IncrementalRemoteKeyedStateHandle(
+backendID,
+KeyGroupRange.of(0, 0),
+1L,
+placeSpies(
+new HashMap() {
+{
+put(handleID, streamHandle1);
+}
+}),

Review Comment:
   Collections.singletonMap return a immutable map, but we need replace the 
value by IncrementalRemoteKeyedStateHandleTest#placeSpies .



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zoltar9264 commented on a diff in pull request #22669: [FLINK-29913] fix shared state be discarded by mistake when maxConcurrentCheckpoint>1

2023-05-31 Thread via GitHub


zoltar9264 commented on code in PR #22669:
URL: https://github.com/apache/flink/pull/22669#discussion_r1211126751


##
flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandle.java:
##
@@ -324,7 +325,7 @@ public void registerSharedStates(SharedStateRegistry 
stateRegistry, long checkpo
 for (Map.Entry sharedStateHandle :
 sharedState.entrySet()) {
 SharedStateRegistryKey registryKey =
-
createSharedStateRegistryKeyFromFileName(sharedStateHandle.getKey());
+createSharedStateRegistryKey(sharedStateHandle.getValue());

Review Comment:
   I agree, but that need modify the checkpoint metadata serializer, what do 
you think about opening a new ticket to do this? If possible, I will start as 
soon as this ticket is completed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zoltar9264 commented on a diff in pull request #22669: [FLINK-29913] fix shared state be discarded by mistake when maxConcurrentCheckpoint>1

2023-05-30 Thread via GitHub


zoltar9264 commented on code in PR #22669:
URL: https://github.com/apache/flink/pull/22669#discussion_r1211024303


##
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java:
##
@@ -395,18 +394,16 @@ public void release() {
 /** Previous snapshot with uploaded sst files. */
 protected static class PreviousSnapshot {
 
-@Nullable private final Map confirmedSstFiles;
+@Nullable private final Map 
confirmedSstFiles;
 
-protected PreviousSnapshot(@Nullable Map 
confirmedSstFiles) {
+protected PreviousSnapshot(
+@Nullable Map 
confirmedSstFiles) {
 this.confirmedSstFiles = confirmedSstFiles;
 }
 
 protected Optional getUploaded(StateHandleID 
stateHandleID) {
 if (confirmedSstFiles != null && 
confirmedSstFiles.containsKey(stateHandleID)) {
-// we introduce a placeholder state handle, that is replaced 
with the
-// original from the shared state registry (created from a 
previous checkpoint)
-return Optional.of(
-new 
PlaceholderStreamStateHandle(confirmedSstFiles.get(stateHandleID)));

Review Comment:
   ByteStreamStateHandle tends to occupy little memory (no more than 20KB by 
default), and these memory occupations only occur during network transmission. 
When repeated ByteStreamStateHandle is passed to SharedStateRegistry, it will 
be discarded. And, I think we should try to avoid the generation of small files 
(maybe through file aggregation, etc.).
   Using PlaceholderStreamStateHandle makes it difficult to implement 
SharedStateRegisterKey based on physical id, although it can be achieved by 
storing physical id of original StateHandle into PlaceholderStreamStateHandle. 
But I still think removing the PlaceholderStreamStateHandle would make the 
SharedStateRegistry much simpler and clearer. Since I haven't observed 
bottlenecks in ByteStreamStateHandle delivery in job with 2.6TB of state, I 
think the rpc overhead of removing PlaceholderStreamStateHandle is acceptable.
   Is the above explanation acceptable to you @rkhachatryan  ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org