[GitHub] [flink] rkhachatryan commented on a diff in pull request #22669: [FLINK-29913] fix shared state be discarded by mistake when maxConcurrentCheckpoint>1

2023-07-18 Thread via GitHub


rkhachatryan commented on code in PR #22669:
URL: https://github.com/apache/flink/pull/22669#discussion_r1267232851


##
flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandle.java:
##
@@ -359,11 +357,12 @@ IncrementalRemoteKeyedStateHandle copy() {
 stateHandleId);
 }
 
-/** Create a unique key to register one of our shared state handles. */
+/** Create a unique key based on physical id to register one of our shared 
state handles. */
 @VisibleForTesting
-public SharedStateRegistryKey 
createSharedStateRegistryKeyFromFileName(StateHandleID shId) {
+public SharedStateRegistryKey 
createSharedStateRegistryKey(StreamStateHandle handle) {
+String keyString = handle.getStreamStateHandleID().getKeyString();
 return new SharedStateRegistryKey(
-String.valueOf(backendIdentifier) + '-' + keyGroupRange, shId);
+
UUID.nameUUIDFromBytes(keyString.getBytes(StandardCharsets.UTF_8)).toString());

Review Comment:
   >> Why do we even take the MD5 of the key string and not the key string 
itself? Is the reason to save memory for long key strings?
   
   > Yes, key strings tend to be longer.
   
   Maybe also clarify that in code?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a diff in pull request #22669: [FLINK-29913] fix shared state be discarded by mistake when maxConcurrentCheckpoint>1

2023-07-18 Thread via GitHub


rkhachatryan commented on code in PR #22669:
URL: https://github.com/apache/flink/pull/22669#discussion_r1267223874


##
flink-runtime/src/test/java/org/apache/flink/runtime/state/DiscardRecordedStateObject.java:
##
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.util.TernaryBoolean;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** A test mock of {@link StateObject} which need record itself whether been 
discarded. */
+public interface DiscardRecordedStateObject extends StateObject {

Review Comment:
   Do I understand correctly, that this class was added to avoid using mocks 
for state handles?
   
   Although that's a good change IMO, I think it would be nice to extract it 
into a separate commit - because the change touches tests of a very sensitive 
part (SharedStateRegsitry).



##
flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java:
##
@@ -167,7 +152,6 @@ public StreamStateHandle registerReference(
 }
 } // end of synchronized (registeredStates)
 
-scheduleAsyncDelete(scheduledStateDeletion);

Review Comment:
   Can you please expand the commit message to clarify why this is not 
necessary anymore?
   
   And probably rename the commit from 
   `[hotfix][state-changelog] not trigger new materialization until previous 
one confirmed or failed or cancelled`
   to 
   `[hotfix][state-changelog] Don't trigger new materialization unless the 
previous one is confirmed/failed/cancelled`



##
flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandle.java:
##
@@ -359,11 +357,12 @@ IncrementalRemoteKeyedStateHandle copy() {
 stateHandleId);
 }
 
-/** Create a unique key to register one of our shared state handles. */
+/** Create a unique key based on physical id to register one of our shared 
state handles. */
 @VisibleForTesting
-public SharedStateRegistryKey 
createSharedStateRegistryKeyFromFileName(StateHandleID shId) {
+public SharedStateRegistryKey 
createSharedStateRegistryKey(StreamStateHandle handle) {
+String keyString = handle.getStreamStateHandleID().getKeyString();
 return new SharedStateRegistryKey(
-String.valueOf(backendIdentifier) + '-' + keyGroupRange, shId);
+
UUID.nameUUIDFromBytes(keyString.getBytes(StandardCharsets.UTF_8)).toString());

Review Comment:
   Maybe also clarify that in code?



##
flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandle.java:
##
@@ -359,11 +357,12 @@ IncrementalRemoteKeyedStateHandle copy() {
 stateHandleId);
 }
 
-/** Create a unique key to register one of our shared state handles. */
+/** Create a unique key based on physical id to register one of our shared 
state handles. */
 @VisibleForTesting
-public SharedStateRegistryKey 
createSharedStateRegistryKeyFromFileName(StateHandleID shId) {
+public SharedStateRegistryKey 
createSharedStateRegistryKey(StreamStateHandle handle) {
+String keyString = handle.getStreamStateHandleID().getKeyString();

Review Comment:
   WDYT about renaming the corresponding commit from 
   `[FLINK-29913][checkpointing] make IncrementalRemoteKeyedStateHandle 
register shared state use PhysicalStateHandleID as register key`
   to something like
   `[FLINK-29913][checkpointing] Use PhysicalStateHandleID as a key for shared 
state of IncrementalRemoteKeyedStateHandle`
   and describe the problem this change solves in the commit message?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a diff in pull request #22669: [FLINK-29913] fix shared state be discarded by mistake when maxConcurrentCheckpoint>1

2023-06-26 Thread via GitHub


rkhachatryan commented on code in PR #22669:
URL: https://github.com/apache/flink/pull/22669#discussion_r1242179978


##
flink-state-backends/flink-statebackend-common/src/main/java/org/apache/flink/state/common/PeriodicMaterializationManager.java:
##
@@ -67,14 +67,22 @@ public interface MaterializationTarget {
 
 /**
  * This method is not thread safe. It should be called either under a 
lock or through task
- * mailbox executor.
+ * mailbox executor. Implementations should ensure that not to trigger 
materialization until
+ * the previous one not confirmed or failed.
  */
 void handleMaterializationResult(
 SnapshotResult materializedSnapshot,
 long materializationID,
 SequenceNumber upTo)
 throws Exception;
 
+/**
+ * This method is not thread safe. It should be called either under a 
lock or through task
+ * mailbox executor.
+ */

Review Comment:
   1. This is true for all the methods of this class, and is usually the case 
with most Flink code 
   2. Otherwise, use `@NotThreadSafe` on class?



##
flink-state-backends/flink-statebackend-common/src/main/java/org/apache/flink/state/common/PeriodicMaterializationManager.java:
##
@@ -67,14 +67,22 @@ public interface MaterializationTarget {
 
 /**
  * This method is not thread safe. It should be called either under a 
lock or through task
- * mailbox executor.
+ * mailbox executor. Implementations should ensure that not to trigger 
materialization until
+ * the previous one not confirmed or failed.

Review Comment:
   ```suggestion
* mailbox executor. Implementations should not trigger 
materialization until
* the previous one has been confirmed or failed.
   ```



##
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java:
##
@@ -728,6 +731,7 @@ private ChangelogSnapshotState completeRestore(
 materializationId = Math.max(materializationId, 
h.getMaterializationID());
 }
 }
+this.lastFailedMaterializationId = materializationId;

Review Comment:
   Why do we set `lastFailedMaterializationId` to "restore" `materializationId`?



##
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java:
##
@@ -390,23 +393,36 @@ public void release() {
 }
 
 protected static final PreviousSnapshot EMPTY_PREVIOUS_SNAPSHOT =
-new PreviousSnapshot(Collections.emptyMap());
+new PreviousSnapshot(Collections.emptyList());
 
 /** Previous snapshot with uploaded sst files. */
 protected static class PreviousSnapshot {
 
-@Nullable private final Map confirmedSstFiles;
-
-protected PreviousSnapshot(@Nullable Map 
confirmedSstFiles) {
-this.confirmedSstFiles = confirmedSstFiles;
+@Nonnull private final Map 
confirmedSstFiles;
+
+protected PreviousSnapshot(@Nullable Collection 
confirmedSstFiles) {
+this.confirmedSstFiles =
+confirmedSstFiles != null
+? confirmedSstFiles.stream()
+.collect(
+Collectors.toMap(
+
HandleAndLocalPath::getLocalPath,
+
HandleAndLocalPath::getHandle))
+: Collections.emptyMap();
 }
 
-protected Optional getUploaded(StateHandleID 
stateHandleID) {
-if (confirmedSstFiles != null && 
confirmedSstFiles.containsKey(stateHandleID)) {
-// we introduce a placeholder state handle, that is replaced 
with the
-// original from the shared state registry (created from a 
previous checkpoint)
-return Optional.of(
-new 
PlaceholderStreamStateHandle(confirmedSstFiles.get(stateHandleID)));
+protected Optional getUploaded(String filename) {
+if (confirmedSstFiles.containsKey(filename)) {
+StreamStateHandle handle = confirmedSstFiles.get(filename);
+if (handle instanceof ByteStreamStateHandle) {

Review Comment:
   I think this check doesn't add any value and adds complexity by adding two 
more lines, and more importantly by making developers to think about special 
cases. 



##
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java:
##
@@ -758,6 +762,15 @@ private ChangelogSnapshotState completeRestore(
  */
 @Override
 public Optional initMaterialization() throws 
Exception {
+if 

[GitHub] [flink] rkhachatryan commented on a diff in pull request #22669: [FLINK-29913] fix shared state be discarded by mistake when maxConcurrentCheckpoint>1

2023-06-19 Thread via GitHub


rkhachatryan commented on code in PR #22669:
URL: https://github.com/apache/flink/pull/22669#discussion_r1234528219


##
flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java:
##
@@ -122,37 +121,20 @@ public StreamStateHandle registerReference(
 LOG.trace(
 "Duplicated registration under key {} with a 
placeholder (normal case)",
 registrationKey);
-scheduledStateDeletion = newHandle;
-} else if (entry.confirmed) {
-LOG.info(
-"Duplicated registration under key {} of a new state: 
{}. "
-+ "This might happen if checkpoint 
confirmation was delayed and state backend re-uploaded the state. "
-+ "Discarding the new state and keeping the 
old one which is included into a completed checkpoint",
-registrationKey,
-newHandle);
-scheduledStateDeletion = newHandle;
 } else {
-// Old entry is not in a confirmed checkpoint yet, and the new 
one differs.
-// This might result from (omitted KG range here for 
simplicity):
-// 1. Flink recovers from a failure using a checkpoint 1
-// 2. State Backend is initialized to UID xyz and a set of 
SST: { 01.sst }
-// 3. JM triggers checkpoint 2
-// 4. TM sends handle: "xyz-002.sst"; JM registers it under 
"xyz-002.sst"
-// 5. TM crashes; everything is repeated from (2)
-// 6. TM recovers from CP 1 again: backend UID "xyz", SST { 
01.sst }
-// 7. JM triggers checkpoint 3
-// 8. TM sends NEW state "xyz-002.sst"
-// 9. JM discards it as duplicate
-// 10. checkpoint completes, but a wrong SST file is used
-// So we use a new entry and discard the old one:
+// might be a bug expect the StreamStateHandleWrapper used by
+// ChangelogStateBackendHandleImpl
 LOG.info(
-"Duplicated registration under key {} of a new state: 
{}. "
-+ "This might happen during the task failover 
if state backend creates different states with the same key before and after 
the failure. "
-+ "Discarding the OLD state and keeping the 
NEW one which is included into a completed checkpoint",
+"the registered handle should equal to the previous 
one or is a placeholder, register key:{}, handle:{}",
 registrationKey,
 newHandle);
-scheduledStateDeletion = entry.stateHandle;
-entry.stateHandle = newHandle;
+if (entry.stateHandle instanceof 
EmptyDiscardStateObjectForRegister) {

Review Comment:
   Otherwise throw an exception?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a diff in pull request #22669: [FLINK-29913] fix shared state be discarded by mistake when maxConcurrentCheckpoint>1

2023-06-07 Thread via GitHub


rkhachatryan commented on code in PR #22669:
URL: https://github.com/apache/flink/pull/22669#discussion_r1221117363


##
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java:
##
@@ -395,18 +394,16 @@ public void release() {
 /** Previous snapshot with uploaded sst files. */
 protected static class PreviousSnapshot {
 
-@Nullable private final Map confirmedSstFiles;
+@Nullable private final Map 
confirmedSstFiles;
 
-protected PreviousSnapshot(@Nullable Map 
confirmedSstFiles) {
+protected PreviousSnapshot(
+@Nullable Map 
confirmedSstFiles) {
 this.confirmedSstFiles = confirmedSstFiles;
 }
 
 protected Optional getUploaded(StateHandleID 
stateHandleID) {
 if (confirmedSstFiles != null && 
confirmedSstFiles.containsKey(stateHandleID)) {
-// we introduce a placeholder state handle, that is replaced 
with the
-// original from the shared state registry (created from a 
previous checkpoint)
-return Optional.of(
-new 
PlaceholderStreamStateHandle(confirmedSstFiles.get(stateHandleID)));

Review Comment:
   Thanks for yours too @zoltar9264 
   
   Sounds good, I like this approach 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a diff in pull request #22669: [FLINK-29913] fix shared state be discarded by mistake when maxConcurrentCheckpoint>1

2023-06-06 Thread via GitHub


rkhachatryan commented on code in PR #22669:
URL: https://github.com/apache/flink/pull/22669#discussion_r1220434655


##
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java:
##
@@ -395,18 +394,16 @@ public void release() {
 /** Previous snapshot with uploaded sst files. */
 protected static class PreviousSnapshot {
 
-@Nullable private final Map confirmedSstFiles;
+@Nullable private final Map 
confirmedSstFiles;
 
-protected PreviousSnapshot(@Nullable Map 
confirmedSstFiles) {
+protected PreviousSnapshot(
+@Nullable Map 
confirmedSstFiles) {
 this.confirmedSstFiles = confirmedSstFiles;
 }
 
 protected Optional getUploaded(StateHandleID 
stateHandleID) {
 if (confirmedSstFiles != null && 
confirmedSstFiles.containsKey(stateHandleID)) {
-// we introduce a placeholder state handle, that is replaced 
with the
-// original from the shared state registry (created from a 
previous checkpoint)
-return Optional.of(
-new 
PlaceholderStreamStateHandle(confirmedSstFiles.get(stateHandleID)));

Review Comment:
   Thanks @zoltar9264 ,
   
   > For example, the registry in this issue is wrongly de-duplicated.
   > If we only perform the replacement when the placeholder is used, and 
reduce the scope of the placeholder, it is possible to avoid the problem caused 
by the registry returning the wrong handle.
   
   But this issue is not related to the `PlaceholderStreamStateHandle`, is it? 
It happens when both new and old handles are NOT 
`PlaceholderStreamStateHandle`s.
   
   > Furthermore, if we check that the handle returned by the registry is equal 
to the registered handle when the placeholder is not used, we can detect the 
problem earlier.
   
   I think the problem is actually when the handles under the same key are NOT 
equal - and we need to resolve this conflict somehow. Note that's not related 
`PlaceholderStreamStateHandle`.
   
   But since in this PR we entirely eliminate the possibility of collision by 
using unique keys, such a collision would mean a bug; so we can remove that 
resolution logic and raise an error instead:
   - remove `SharedStateRegistryImpl.SharedStateEntry.confirmed` field
   - remove any calls to `scheduleAsyncDelete()` from `registerReference()`
   -  (but keep `PlaceholderStreamStateHandle`)
   
   WDYT?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a diff in pull request #22669: [FLINK-29913] fix shared state be discarded by mistake when maxConcurrentCheckpoint>1

2023-06-05 Thread via GitHub


rkhachatryan commented on code in PR #22669:
URL: https://github.com/apache/flink/pull/22669#discussion_r1218292582


##
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java:
##
@@ -395,18 +394,16 @@ public void release() {
 /** Previous snapshot with uploaded sst files. */
 protected static class PreviousSnapshot {
 
-@Nullable private final Map confirmedSstFiles;
+@Nullable private final Map 
confirmedSstFiles;
 
-protected PreviousSnapshot(@Nullable Map 
confirmedSstFiles) {
+protected PreviousSnapshot(
+@Nullable Map 
confirmedSstFiles) {
 this.confirmedSstFiles = confirmedSstFiles;
 }
 
 protected Optional getUploaded(StateHandleID 
stateHandleID) {
 if (confirmedSstFiles != null && 
confirmedSstFiles.containsKey(stateHandleID)) {
-// we introduce a placeholder state handle, that is replaced 
with the
-// original from the shared state registry (created from a 
previous checkpoint)
-return Optional.of(
-new 
PlaceholderStreamStateHandle(confirmedSstFiles.get(stateHandleID)));

Review Comment:
   > So I think String is clearer and enough ?
   
   I think `String` should work too.
   
   > About 'only use PlaceholderStreamStateHandle while the origin handle is 
ByteStreamStateHandle':
   In fact I want to suggest developers to do this in the doc of 
PlaceholderStreamStateHandle and SharedStateRegistry. Perhaps some bugs could 
have been avoided if developers followed this advice and only performed 
replacements on PlaceholderStreamStateHandle . I'm not sure if this is 
over-designed, if so please correct me.
   
   I'm sorry but I'm still missing what's the issue with using 
`PlaceholderStreamStateHandle` for **any** kind of handles?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a diff in pull request #22669: [FLINK-29913] fix shared state be discarded by mistake when maxConcurrentCheckpoint>1

2023-06-02 Thread via GitHub


rkhachatryan commented on code in PR #22669:
URL: https://github.com/apache/flink/pull/22669#discussion_r1214051048


##
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java:
##
@@ -395,18 +394,16 @@ public void release() {
 /** Previous snapshot with uploaded sst files. */
 protected static class PreviousSnapshot {
 
-@Nullable private final Map confirmedSstFiles;
+@Nullable private final Map 
confirmedSstFiles;
 
-protected PreviousSnapshot(@Nullable Map 
confirmedSstFiles) {
+protected PreviousSnapshot(
+@Nullable Map 
confirmedSstFiles) {
 this.confirmedSstFiles = confirmedSstFiles;
 }
 
 protected Optional getUploaded(StateHandleID 
stateHandleID) {
 if (confirmedSstFiles != null && 
confirmedSstFiles.containsKey(stateHandleID)) {
-// we introduce a placeholder state handle, that is replaced 
with the
-// original from the shared state registry (created from a 
previous checkpoint)
-return Optional.of(
-new 
PlaceholderStreamStateHandle(confirmedSstFiles.get(stateHandleID)));

Review Comment:
   Thanks @zoltar9264 ,
   
   > Do you mean change IncrementalRemoteKeyedStateHandle like this ?
   
   Yes, I mean something like that, maybe using `Path` or `File` instead of 
`String` for `localPath`.
   
   > I suggest only use PlaceholderStreamStateHandle while the origin handle is 
ByteStreamStateHandle. This pr already implemented not use 
PlaceholderStreamStateHandle calculate checkpointed size, I want keep it.
   
   Can you share the motivation?
   I think that this will just add an additional `instanceof` and increase 
complexity. It would also easier to break if we add a new type of handle that 
needs replacement. Or am I missing something?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a diff in pull request #22669: [FLINK-29913] fix shared state be discarded by mistake when maxConcurrentCheckpoint>1

2023-06-02 Thread via GitHub


rkhachatryan commented on code in PR #22669:
URL: https://github.com/apache/flink/pull/22669#discussion_r1214051048


##
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java:
##
@@ -395,18 +394,16 @@ public void release() {
 /** Previous snapshot with uploaded sst files. */
 protected static class PreviousSnapshot {
 
-@Nullable private final Map confirmedSstFiles;
+@Nullable private final Map 
confirmedSstFiles;
 
-protected PreviousSnapshot(@Nullable Map 
confirmedSstFiles) {
+protected PreviousSnapshot(
+@Nullable Map 
confirmedSstFiles) {
 this.confirmedSstFiles = confirmedSstFiles;
 }
 
 protected Optional getUploaded(StateHandleID 
stateHandleID) {
 if (confirmedSstFiles != null && 
confirmedSstFiles.containsKey(stateHandleID)) {
-// we introduce a placeholder state handle, that is replaced 
with the
-// original from the shared state registry (created from a 
previous checkpoint)
-return Optional.of(
-new 
PlaceholderStreamStateHandle(confirmedSstFiles.get(stateHandleID)));

Review Comment:
   > Do you mean change IncrementalRemoteKeyedStateHandle like this ?
   
   Yes, I mean something like that, maybe using `Path` or `File` instead of 
`String` for `localPath`.
   
   > I suggest only use PlaceholderStreamStateHandle while the origin handle is 
ByteStreamStateHandle. This pr already implemented not use 
PlaceholderStreamStateHandle calculate checkpointed size, I want keep it.
   
   Can you share the motivation?
   I think that this will just add an additional `instanceof` and increase 
complexity. It would also easier to break if we add a new type of handle that 
needs replacement. Or am I missing something?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a diff in pull request #22669: [FLINK-29913] fix shared state be discarded by mistake when maxConcurrentCheckpoint>1

2023-06-01 Thread via GitHub


rkhachatryan commented on code in PR #22669:
URL: https://github.com/apache/flink/pull/22669#discussion_r1212720075


##
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java:
##
@@ -395,18 +394,16 @@ public void release() {
 /** Previous snapshot with uploaded sst files. */
 protected static class PreviousSnapshot {
 
-@Nullable private final Map confirmedSstFiles;
+@Nullable private final Map 
confirmedSstFiles;
 
-protected PreviousSnapshot(@Nullable Map 
confirmedSstFiles) {
+protected PreviousSnapshot(
+@Nullable Map 
confirmedSstFiles) {
 this.confirmedSstFiles = confirmedSstFiles;
 }
 
 protected Optional getUploaded(StateHandleID 
stateHandleID) {
 if (confirmedSstFiles != null && 
confirmedSstFiles.containsKey(stateHandleID)) {
-// we introduce a placeholder state handle, that is replaced 
with the
-// original from the shared state registry (created from a 
previous checkpoint)
-return Optional.of(
-new 
PlaceholderStreamStateHandle(confirmedSstFiles.get(stateHandleID)));

Review Comment:
   > But I still recommend limiting its usage to ByteStreamStateHandle, and 
don't use it to calculate checkpointed size.
   
   IIUC, that's already implemented. Why would you change this?
   
   > And I suggest saving the PhysicalStateHandleID of the original StateHandle 
into the PlaceholderStreamStateHandle to simplify the registration. 
   
   That might not be necessary, if the registration key is the same key as in 
`IncrementalRemoteKeyedStateHandle.sharedState`, as per 
[comment](https://github.com/apache/flink/pull/22669#discussion_r1210450731).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a diff in pull request #22669: [FLINK-29913] fix shared state be discarded by mistake when maxConcurrentCheckpoint>1

2023-05-31 Thread via GitHub


rkhachatryan commented on code in PR #22669:
URL: https://github.com/apache/flink/pull/22669#discussion_r1212450869


##
flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandle.java:
##
@@ -324,7 +325,7 @@ public void registerSharedStates(SharedStateRegistry 
stateRegistry, long checkpo
 for (Map.Entry sharedStateHandle :
 sharedState.entrySet()) {
 SharedStateRegistryKey registryKey =
-
createSharedStateRegistryKeyFromFileName(sharedStateHandle.getKey());
+createSharedStateRegistryKey(sharedStateHandle.getValue());

Review Comment:
   I rather see it as a part of this ticket because it's directly related and 
one should NOT be merged without the other.
   Why would you implement this as a separate ticket?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a diff in pull request #22669: [FLINK-29913] fix shared state be discarded by mistake when maxConcurrentCheckpoint>1

2023-05-31 Thread via GitHub


rkhachatryan commented on code in PR #22669:
URL: https://github.com/apache/flink/pull/22669#discussion_r1212449390


##
flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandle.java:
##
@@ -361,9 +362,10 @@ IncrementalRemoteKeyedStateHandle copy() {
 
 /** Create a unique key to register one of our shared state handles. */
 @VisibleForTesting
-public SharedStateRegistryKey 
createSharedStateRegistryKeyFromFileName(StateHandleID shId) {
+public SharedStateRegistryKey 
createSharedStateRegistryKey(StreamStateHandle handle) {
+String keyString = handle.getStreamStateHandleID().getKeyString();
 return new SharedStateRegistryKey(
-String.valueOf(backendIdentifier) + '-' + keyGroupRange, shId);
+
UUID.nameUUIDFromBytes(keyString.getBytes(StandardCharsets.UTF_8)).toString());

Review Comment:
   Yes, I think you're right.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a diff in pull request #22669: [FLINK-29913] fix shared state be discarded by mistake when maxConcurrentCheckpoint>1

2023-05-31 Thread via GitHub


rkhachatryan commented on code in PR #22669:
URL: https://github.com/apache/flink/pull/22669#discussion_r1212449200


##
flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandleTest.java:
##
@@ -243,6 +244,60 @@ public void testNonEmptyIntersection() {
 assertEquals(handle.getStateHandleId(), newHandle.getStateHandleId());
 }
 
+@Test
+public void testConcurrentCheckpointSharedStateRegistration() throws 
Exception {
+StateHandleID handleID = new StateHandleID("1.sst");
+StreamStateHandle streamHandle1 = new ByteStreamStateHandle("file-1", 
new byte[] {'s'});
+StreamStateHandle streamHandle2 = new ByteStreamStateHandle("file-2", 
new byte[] {'s'});
+
+SharedStateRegistry registry = new SharedStateRegistryImpl();
+
+UUID backendID = UUID.randomUUID();
+
+IncrementalRemoteKeyedStateHandle handle1 =
+new IncrementalRemoteKeyedStateHandle(
+backendID,
+KeyGroupRange.of(0, 0),
+1L,
+placeSpies(
+new HashMap() {
+{
+put(handleID, streamHandle1);
+}
+}),

Review Comment:
   Fair enough.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a diff in pull request #22669: [FLINK-29913] fix shared state be discarded by mistake when maxConcurrentCheckpoint>1

2023-05-31 Thread via GitHub


rkhachatryan commented on code in PR #22669:
URL: https://github.com/apache/flink/pull/22669#discussion_r1212443717


##
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java:
##
@@ -395,18 +394,16 @@ public void release() {
 /** Previous snapshot with uploaded sst files. */
 protected static class PreviousSnapshot {
 
-@Nullable private final Map confirmedSstFiles;
+@Nullable private final Map 
confirmedSstFiles;
 
-protected PreviousSnapshot(@Nullable Map 
confirmedSstFiles) {
+protected PreviousSnapshot(
+@Nullable Map 
confirmedSstFiles) {
 this.confirmedSstFiles = confirmedSstFiles;
 }
 
 protected Optional getUploaded(StateHandleID 
stateHandleID) {
 if (confirmedSstFiles != null && 
confirmedSstFiles.containsKey(stateHandleID)) {
-// we introduce a placeholder state handle, that is replaced 
with the
-// original from the shared state registry (created from a 
previous checkpoint)
-return Optional.of(
-new 
PlaceholderStreamStateHandle(confirmedSstFiles.get(stateHandleID)));

Review Comment:
   Thanks for the explanation @zoltar9264 
   
   My concern is exactly about additional overhead of data transmission (the 
purpose of placeholder is to reduce it).
   I'm not sure whether `PlaceholderStreamStateHandle` removal will not cause a 
regression.
   It depends not only on the state size, but also on the configuration 
(`state.storage.fs.memory-threshold`) and the actual size of the SST files.
   
   So I think it would be safer to keep it. I agree that that requires more 
code changes, but I think it worth it.
   The complexity added by `PlaceholderStreamStateHandle` doesn't seem very 
high.
   
   WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a diff in pull request #22669: [FLINK-29913] fix shared state be discarded by mistake when maxConcurrentCheckpoint>1

2023-05-30 Thread via GitHub


rkhachatryan commented on code in PR #22669:
URL: https://github.com/apache/flink/pull/22669#discussion_r1210462542


##
flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandleTest.java:
##
@@ -243,6 +244,60 @@ public void testNonEmptyIntersection() {
 assertEquals(handle.getStateHandleId(), newHandle.getStateHandleId());
 }
 
+@Test
+public void testConcurrentCheckpointSharedStateRegistration() throws 
Exception {
+StateHandleID handleID = new StateHandleID("1.sst");
+StreamStateHandle streamHandle1 = new ByteStreamStateHandle("file-1", 
new byte[] {'s'});
+StreamStateHandle streamHandle2 = new ByteStreamStateHandle("file-2", 
new byte[] {'s'});
+
+SharedStateRegistry registry = new SharedStateRegistryImpl();
+
+UUID backendID = UUID.randomUUID();
+
+IncrementalRemoteKeyedStateHandle handle1 =
+new IncrementalRemoteKeyedStateHandle(
+backendID,
+KeyGroupRange.of(0, 0),
+1L,
+placeSpies(
+new HashMap() {
+{
+put(handleID, streamHandle1);
+}
+}),

Review Comment:
   `Collections.singletonMap`?
   
   ditto: line 279



##
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java:
##
@@ -395,18 +394,16 @@ public void release() {
 /** Previous snapshot with uploaded sst files. */
 protected static class PreviousSnapshot {
 
-@Nullable private final Map confirmedSstFiles;
+@Nullable private final Map 
confirmedSstFiles;
 
-protected PreviousSnapshot(@Nullable Map 
confirmedSstFiles) {
+protected PreviousSnapshot(
+@Nullable Map 
confirmedSstFiles) {
 this.confirmedSstFiles = confirmedSstFiles;
 }
 
 protected Optional getUploaded(StateHandleID 
stateHandleID) {
 if (confirmedSstFiles != null && 
confirmedSstFiles.containsKey(stateHandleID)) {
-// we introduce a placeholder state handle, that is replaced 
with the
-// original from the shared state registry (created from a 
previous checkpoint)
-return Optional.of(
-new 
PlaceholderStreamStateHandle(confirmedSstFiles.get(stateHandleID)));

Review Comment:
   Could you elaborate why you had to delete `PlaceholderStreamStateHandle`?
   
   I'm concerned that with this change, `ByteStreamStateHandle` will always be 
sent always to the JM (regardless of whether they were previously sent or not).



##
flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandleTest.java:
##
@@ -243,6 +244,60 @@ public void testNonEmptyIntersection() {
 assertEquals(handle.getStateHandleId(), newHandle.getStateHandleId());
 }
 
+@Test
+public void testConcurrentCheckpointSharedStateRegistration() throws 
Exception {
+StateHandleID handleID = new StateHandleID("1.sst");
+StreamStateHandle streamHandle1 = new ByteStreamStateHandle("file-1", 
new byte[] {'s'});
+StreamStateHandle streamHandle2 = new ByteStreamStateHandle("file-2", 
new byte[] {'s'});
+
+SharedStateRegistry registry = new SharedStateRegistryImpl();
+
+UUID backendID = UUID.randomUUID();
+
+IncrementalRemoteKeyedStateHandle handle1 =
+new IncrementalRemoteKeyedStateHandle(
+backendID,
+KeyGroupRange.of(0, 0),
+1L,
+placeSpies(
+new HashMap() {
+{
+put(handleID, streamHandle1);
+}
+}),
+Collections.emptyMap(),
+new ByteStreamStateHandle("", new byte[] {'s'}));
+
+handle1.registerSharedStates(registry, handle1.getCheckpointId());
+
+IncrementalRemoteKeyedStateHandle handle2 =
+new IncrementalRemoteKeyedStateHandle(
+backendID,
+KeyGroupRange.of(0, 0),
+2L,
+placeSpies(
+new HashMap() {
+{
+put(handleID, streamHandle2);
+}
+}),
+Collections.emptyMap(),
+new ByteStreamStateHandle("", new byte[] {'s'}));
+
+handle2.registerSharedStates(registry,