[GitHub] [flink] rkhachatryan commented on a diff in pull request #22669: [FLINK-29913] fix shared state be discarded by mistake when maxConcurrentCheckpoint>1
rkhachatryan commented on code in PR #22669: URL: https://github.com/apache/flink/pull/22669#discussion_r1267232851 ## flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandle.java: ## @@ -359,11 +357,12 @@ IncrementalRemoteKeyedStateHandle copy() { stateHandleId); } -/** Create a unique key to register one of our shared state handles. */ +/** Create a unique key based on physical id to register one of our shared state handles. */ @VisibleForTesting -public SharedStateRegistryKey createSharedStateRegistryKeyFromFileName(StateHandleID shId) { +public SharedStateRegistryKey createSharedStateRegistryKey(StreamStateHandle handle) { +String keyString = handle.getStreamStateHandleID().getKeyString(); return new SharedStateRegistryKey( -String.valueOf(backendIdentifier) + '-' + keyGroupRange, shId); + UUID.nameUUIDFromBytes(keyString.getBytes(StandardCharsets.UTF_8)).toString()); Review Comment: >> Why do we even take the MD5 of the key string and not the key string itself? Is the reason to save memory for long key strings? > Yes, key strings tend to be longer. Maybe also clarify that in code? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] rkhachatryan commented on a diff in pull request #22669: [FLINK-29913] fix shared state be discarded by mistake when maxConcurrentCheckpoint>1
rkhachatryan commented on code in PR #22669: URL: https://github.com/apache/flink/pull/22669#discussion_r1267223874 ## flink-runtime/src/test/java/org/apache/flink/runtime/state/DiscardRecordedStateObject.java: ## @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.util.TernaryBoolean; + +import static org.assertj.core.api.Assertions.assertThat; + +/** A test mock of {@link StateObject} which need record itself whether been discarded. */ +public interface DiscardRecordedStateObject extends StateObject { Review Comment: Do I understand correctly, that this class was added to avoid using mocks for state handles? Although that's a good change IMO, I think it would be nice to extract it into a separate commit - because the change touches tests of a very sensitive part (SharedStateRegsitry). ## flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java: ## @@ -167,7 +152,6 @@ public StreamStateHandle registerReference( } } // end of synchronized (registeredStates) -scheduleAsyncDelete(scheduledStateDeletion); Review Comment: Can you please expand the commit message to clarify why this is not necessary anymore? And probably rename the commit from `[hotfix][state-changelog] not trigger new materialization until previous one confirmed or failed or cancelled` to `[hotfix][state-changelog] Don't trigger new materialization unless the previous one is confirmed/failed/cancelled` ## flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandle.java: ## @@ -359,11 +357,12 @@ IncrementalRemoteKeyedStateHandle copy() { stateHandleId); } -/** Create a unique key to register one of our shared state handles. */ +/** Create a unique key based on physical id to register one of our shared state handles. */ @VisibleForTesting -public SharedStateRegistryKey createSharedStateRegistryKeyFromFileName(StateHandleID shId) { +public SharedStateRegistryKey createSharedStateRegistryKey(StreamStateHandle handle) { +String keyString = handle.getStreamStateHandleID().getKeyString(); return new SharedStateRegistryKey( -String.valueOf(backendIdentifier) + '-' + keyGroupRange, shId); + UUID.nameUUIDFromBytes(keyString.getBytes(StandardCharsets.UTF_8)).toString()); Review Comment: Maybe also clarify that in code? ## flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandle.java: ## @@ -359,11 +357,12 @@ IncrementalRemoteKeyedStateHandle copy() { stateHandleId); } -/** Create a unique key to register one of our shared state handles. */ +/** Create a unique key based on physical id to register one of our shared state handles. */ @VisibleForTesting -public SharedStateRegistryKey createSharedStateRegistryKeyFromFileName(StateHandleID shId) { +public SharedStateRegistryKey createSharedStateRegistryKey(StreamStateHandle handle) { +String keyString = handle.getStreamStateHandleID().getKeyString(); Review Comment: WDYT about renaming the corresponding commit from `[FLINK-29913][checkpointing] make IncrementalRemoteKeyedStateHandle register shared state use PhysicalStateHandleID as register key` to something like `[FLINK-29913][checkpointing] Use PhysicalStateHandleID as a key for shared state of IncrementalRemoteKeyedStateHandle` and describe the problem this change solves in the commit message? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] rkhachatryan commented on a diff in pull request #22669: [FLINK-29913] fix shared state be discarded by mistake when maxConcurrentCheckpoint>1
rkhachatryan commented on code in PR #22669: URL: https://github.com/apache/flink/pull/22669#discussion_r1242179978 ## flink-state-backends/flink-statebackend-common/src/main/java/org/apache/flink/state/common/PeriodicMaterializationManager.java: ## @@ -67,14 +67,22 @@ public interface MaterializationTarget { /** * This method is not thread safe. It should be called either under a lock or through task - * mailbox executor. + * mailbox executor. Implementations should ensure that not to trigger materialization until + * the previous one not confirmed or failed. */ void handleMaterializationResult( SnapshotResult materializedSnapshot, long materializationID, SequenceNumber upTo) throws Exception; +/** + * This method is not thread safe. It should be called either under a lock or through task + * mailbox executor. + */ Review Comment: 1. This is true for all the methods of this class, and is usually the case with most Flink code 2. Otherwise, use `@NotThreadSafe` on class? ## flink-state-backends/flink-statebackend-common/src/main/java/org/apache/flink/state/common/PeriodicMaterializationManager.java: ## @@ -67,14 +67,22 @@ public interface MaterializationTarget { /** * This method is not thread safe. It should be called either under a lock or through task - * mailbox executor. + * mailbox executor. Implementations should ensure that not to trigger materialization until + * the previous one not confirmed or failed. Review Comment: ```suggestion * mailbox executor. Implementations should not trigger materialization until * the previous one has been confirmed or failed. ``` ## flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java: ## @@ -728,6 +731,7 @@ private ChangelogSnapshotState completeRestore( materializationId = Math.max(materializationId, h.getMaterializationID()); } } +this.lastFailedMaterializationId = materializationId; Review Comment: Why do we set `lastFailedMaterializationId` to "restore" `materializationId`? ## flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java: ## @@ -390,23 +393,36 @@ public void release() { } protected static final PreviousSnapshot EMPTY_PREVIOUS_SNAPSHOT = -new PreviousSnapshot(Collections.emptyMap()); +new PreviousSnapshot(Collections.emptyList()); /** Previous snapshot with uploaded sst files. */ protected static class PreviousSnapshot { -@Nullable private final Map confirmedSstFiles; - -protected PreviousSnapshot(@Nullable Map confirmedSstFiles) { -this.confirmedSstFiles = confirmedSstFiles; +@Nonnull private final Map confirmedSstFiles; + +protected PreviousSnapshot(@Nullable Collection confirmedSstFiles) { +this.confirmedSstFiles = +confirmedSstFiles != null +? confirmedSstFiles.stream() +.collect( +Collectors.toMap( + HandleAndLocalPath::getLocalPath, + HandleAndLocalPath::getHandle)) +: Collections.emptyMap(); } -protected Optional getUploaded(StateHandleID stateHandleID) { -if (confirmedSstFiles != null && confirmedSstFiles.containsKey(stateHandleID)) { -// we introduce a placeholder state handle, that is replaced with the -// original from the shared state registry (created from a previous checkpoint) -return Optional.of( -new PlaceholderStreamStateHandle(confirmedSstFiles.get(stateHandleID))); +protected Optional getUploaded(String filename) { +if (confirmedSstFiles.containsKey(filename)) { +StreamStateHandle handle = confirmedSstFiles.get(filename); +if (handle instanceof ByteStreamStateHandle) { Review Comment: I think this check doesn't add any value and adds complexity by adding two more lines, and more importantly by making developers to think about special cases. ## flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java: ## @@ -758,6 +762,15 @@ private ChangelogSnapshotState completeRestore( */ @Override public Optional initMaterialization() throws Exception { +if
[GitHub] [flink] rkhachatryan commented on a diff in pull request #22669: [FLINK-29913] fix shared state be discarded by mistake when maxConcurrentCheckpoint>1
rkhachatryan commented on code in PR #22669: URL: https://github.com/apache/flink/pull/22669#discussion_r1234528219 ## flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java: ## @@ -122,37 +121,20 @@ public StreamStateHandle registerReference( LOG.trace( "Duplicated registration under key {} with a placeholder (normal case)", registrationKey); -scheduledStateDeletion = newHandle; -} else if (entry.confirmed) { -LOG.info( -"Duplicated registration under key {} of a new state: {}. " -+ "This might happen if checkpoint confirmation was delayed and state backend re-uploaded the state. " -+ "Discarding the new state and keeping the old one which is included into a completed checkpoint", -registrationKey, -newHandle); -scheduledStateDeletion = newHandle; } else { -// Old entry is not in a confirmed checkpoint yet, and the new one differs. -// This might result from (omitted KG range here for simplicity): -// 1. Flink recovers from a failure using a checkpoint 1 -// 2. State Backend is initialized to UID xyz and a set of SST: { 01.sst } -// 3. JM triggers checkpoint 2 -// 4. TM sends handle: "xyz-002.sst"; JM registers it under "xyz-002.sst" -// 5. TM crashes; everything is repeated from (2) -// 6. TM recovers from CP 1 again: backend UID "xyz", SST { 01.sst } -// 7. JM triggers checkpoint 3 -// 8. TM sends NEW state "xyz-002.sst" -// 9. JM discards it as duplicate -// 10. checkpoint completes, but a wrong SST file is used -// So we use a new entry and discard the old one: +// might be a bug expect the StreamStateHandleWrapper used by +// ChangelogStateBackendHandleImpl LOG.info( -"Duplicated registration under key {} of a new state: {}. " -+ "This might happen during the task failover if state backend creates different states with the same key before and after the failure. " -+ "Discarding the OLD state and keeping the NEW one which is included into a completed checkpoint", +"the registered handle should equal to the previous one or is a placeholder, register key:{}, handle:{}", registrationKey, newHandle); -scheduledStateDeletion = entry.stateHandle; -entry.stateHandle = newHandle; +if (entry.stateHandle instanceof EmptyDiscardStateObjectForRegister) { Review Comment: Otherwise throw an exception? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] rkhachatryan commented on a diff in pull request #22669: [FLINK-29913] fix shared state be discarded by mistake when maxConcurrentCheckpoint>1
rkhachatryan commented on code in PR #22669: URL: https://github.com/apache/flink/pull/22669#discussion_r1221117363 ## flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java: ## @@ -395,18 +394,16 @@ public void release() { /** Previous snapshot with uploaded sst files. */ protected static class PreviousSnapshot { -@Nullable private final Map confirmedSstFiles; +@Nullable private final Map confirmedSstFiles; -protected PreviousSnapshot(@Nullable Map confirmedSstFiles) { +protected PreviousSnapshot( +@Nullable Map confirmedSstFiles) { this.confirmedSstFiles = confirmedSstFiles; } protected Optional getUploaded(StateHandleID stateHandleID) { if (confirmedSstFiles != null && confirmedSstFiles.containsKey(stateHandleID)) { -// we introduce a placeholder state handle, that is replaced with the -// original from the shared state registry (created from a previous checkpoint) -return Optional.of( -new PlaceholderStreamStateHandle(confirmedSstFiles.get(stateHandleID))); Review Comment: Thanks for yours too @zoltar9264 Sounds good, I like this approach -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] rkhachatryan commented on a diff in pull request #22669: [FLINK-29913] fix shared state be discarded by mistake when maxConcurrentCheckpoint>1
rkhachatryan commented on code in PR #22669: URL: https://github.com/apache/flink/pull/22669#discussion_r1220434655 ## flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java: ## @@ -395,18 +394,16 @@ public void release() { /** Previous snapshot with uploaded sst files. */ protected static class PreviousSnapshot { -@Nullable private final Map confirmedSstFiles; +@Nullable private final Map confirmedSstFiles; -protected PreviousSnapshot(@Nullable Map confirmedSstFiles) { +protected PreviousSnapshot( +@Nullable Map confirmedSstFiles) { this.confirmedSstFiles = confirmedSstFiles; } protected Optional getUploaded(StateHandleID stateHandleID) { if (confirmedSstFiles != null && confirmedSstFiles.containsKey(stateHandleID)) { -// we introduce a placeholder state handle, that is replaced with the -// original from the shared state registry (created from a previous checkpoint) -return Optional.of( -new PlaceholderStreamStateHandle(confirmedSstFiles.get(stateHandleID))); Review Comment: Thanks @zoltar9264 , > For example, the registry in this issue is wrongly de-duplicated. > If we only perform the replacement when the placeholder is used, and reduce the scope of the placeholder, it is possible to avoid the problem caused by the registry returning the wrong handle. But this issue is not related to the `PlaceholderStreamStateHandle`, is it? It happens when both new and old handles are NOT `PlaceholderStreamStateHandle`s. > Furthermore, if we check that the handle returned by the registry is equal to the registered handle when the placeholder is not used, we can detect the problem earlier. I think the problem is actually when the handles under the same key are NOT equal - and we need to resolve this conflict somehow. Note that's not related `PlaceholderStreamStateHandle`. But since in this PR we entirely eliminate the possibility of collision by using unique keys, such a collision would mean a bug; so we can remove that resolution logic and raise an error instead: - remove `SharedStateRegistryImpl.SharedStateEntry.confirmed` field - remove any calls to `scheduleAsyncDelete()` from `registerReference()` - (but keep `PlaceholderStreamStateHandle`) WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] rkhachatryan commented on a diff in pull request #22669: [FLINK-29913] fix shared state be discarded by mistake when maxConcurrentCheckpoint>1
rkhachatryan commented on code in PR #22669: URL: https://github.com/apache/flink/pull/22669#discussion_r1218292582 ## flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java: ## @@ -395,18 +394,16 @@ public void release() { /** Previous snapshot with uploaded sst files. */ protected static class PreviousSnapshot { -@Nullable private final Map confirmedSstFiles; +@Nullable private final Map confirmedSstFiles; -protected PreviousSnapshot(@Nullable Map confirmedSstFiles) { +protected PreviousSnapshot( +@Nullable Map confirmedSstFiles) { this.confirmedSstFiles = confirmedSstFiles; } protected Optional getUploaded(StateHandleID stateHandleID) { if (confirmedSstFiles != null && confirmedSstFiles.containsKey(stateHandleID)) { -// we introduce a placeholder state handle, that is replaced with the -// original from the shared state registry (created from a previous checkpoint) -return Optional.of( -new PlaceholderStreamStateHandle(confirmedSstFiles.get(stateHandleID))); Review Comment: > So I think String is clearer and enough ? I think `String` should work too. > About 'only use PlaceholderStreamStateHandle while the origin handle is ByteStreamStateHandle': In fact I want to suggest developers to do this in the doc of PlaceholderStreamStateHandle and SharedStateRegistry. Perhaps some bugs could have been avoided if developers followed this advice and only performed replacements on PlaceholderStreamStateHandle . I'm not sure if this is over-designed, if so please correct me. I'm sorry but I'm still missing what's the issue with using `PlaceholderStreamStateHandle` for **any** kind of handles? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] rkhachatryan commented on a diff in pull request #22669: [FLINK-29913] fix shared state be discarded by mistake when maxConcurrentCheckpoint>1
rkhachatryan commented on code in PR #22669: URL: https://github.com/apache/flink/pull/22669#discussion_r1214051048 ## flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java: ## @@ -395,18 +394,16 @@ public void release() { /** Previous snapshot with uploaded sst files. */ protected static class PreviousSnapshot { -@Nullable private final Map confirmedSstFiles; +@Nullable private final Map confirmedSstFiles; -protected PreviousSnapshot(@Nullable Map confirmedSstFiles) { +protected PreviousSnapshot( +@Nullable Map confirmedSstFiles) { this.confirmedSstFiles = confirmedSstFiles; } protected Optional getUploaded(StateHandleID stateHandleID) { if (confirmedSstFiles != null && confirmedSstFiles.containsKey(stateHandleID)) { -// we introduce a placeholder state handle, that is replaced with the -// original from the shared state registry (created from a previous checkpoint) -return Optional.of( -new PlaceholderStreamStateHandle(confirmedSstFiles.get(stateHandleID))); Review Comment: Thanks @zoltar9264 , > Do you mean change IncrementalRemoteKeyedStateHandle like this ? Yes, I mean something like that, maybe using `Path` or `File` instead of `String` for `localPath`. > I suggest only use PlaceholderStreamStateHandle while the origin handle is ByteStreamStateHandle. This pr already implemented not use PlaceholderStreamStateHandle calculate checkpointed size, I want keep it. Can you share the motivation? I think that this will just add an additional `instanceof` and increase complexity. It would also easier to break if we add a new type of handle that needs replacement. Or am I missing something? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] rkhachatryan commented on a diff in pull request #22669: [FLINK-29913] fix shared state be discarded by mistake when maxConcurrentCheckpoint>1
rkhachatryan commented on code in PR #22669: URL: https://github.com/apache/flink/pull/22669#discussion_r1214051048 ## flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java: ## @@ -395,18 +394,16 @@ public void release() { /** Previous snapshot with uploaded sst files. */ protected static class PreviousSnapshot { -@Nullable private final Map confirmedSstFiles; +@Nullable private final Map confirmedSstFiles; -protected PreviousSnapshot(@Nullable Map confirmedSstFiles) { +protected PreviousSnapshot( +@Nullable Map confirmedSstFiles) { this.confirmedSstFiles = confirmedSstFiles; } protected Optional getUploaded(StateHandleID stateHandleID) { if (confirmedSstFiles != null && confirmedSstFiles.containsKey(stateHandleID)) { -// we introduce a placeholder state handle, that is replaced with the -// original from the shared state registry (created from a previous checkpoint) -return Optional.of( -new PlaceholderStreamStateHandle(confirmedSstFiles.get(stateHandleID))); Review Comment: > Do you mean change IncrementalRemoteKeyedStateHandle like this ? Yes, I mean something like that, maybe using `Path` or `File` instead of `String` for `localPath`. > I suggest only use PlaceholderStreamStateHandle while the origin handle is ByteStreamStateHandle. This pr already implemented not use PlaceholderStreamStateHandle calculate checkpointed size, I want keep it. Can you share the motivation? I think that this will just add an additional `instanceof` and increase complexity. It would also easier to break if we add a new type of handle that needs replacement. Or am I missing something? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] rkhachatryan commented on a diff in pull request #22669: [FLINK-29913] fix shared state be discarded by mistake when maxConcurrentCheckpoint>1
rkhachatryan commented on code in PR #22669: URL: https://github.com/apache/flink/pull/22669#discussion_r1212720075 ## flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java: ## @@ -395,18 +394,16 @@ public void release() { /** Previous snapshot with uploaded sst files. */ protected static class PreviousSnapshot { -@Nullable private final Map confirmedSstFiles; +@Nullable private final Map confirmedSstFiles; -protected PreviousSnapshot(@Nullable Map confirmedSstFiles) { +protected PreviousSnapshot( +@Nullable Map confirmedSstFiles) { this.confirmedSstFiles = confirmedSstFiles; } protected Optional getUploaded(StateHandleID stateHandleID) { if (confirmedSstFiles != null && confirmedSstFiles.containsKey(stateHandleID)) { -// we introduce a placeholder state handle, that is replaced with the -// original from the shared state registry (created from a previous checkpoint) -return Optional.of( -new PlaceholderStreamStateHandle(confirmedSstFiles.get(stateHandleID))); Review Comment: > But I still recommend limiting its usage to ByteStreamStateHandle, and don't use it to calculate checkpointed size. IIUC, that's already implemented. Why would you change this? > And I suggest saving the PhysicalStateHandleID of the original StateHandle into the PlaceholderStreamStateHandle to simplify the registration. That might not be necessary, if the registration key is the same key as in `IncrementalRemoteKeyedStateHandle.sharedState`, as per [comment](https://github.com/apache/flink/pull/22669#discussion_r1210450731). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] rkhachatryan commented on a diff in pull request #22669: [FLINK-29913] fix shared state be discarded by mistake when maxConcurrentCheckpoint>1
rkhachatryan commented on code in PR #22669: URL: https://github.com/apache/flink/pull/22669#discussion_r1212450869 ## flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandle.java: ## @@ -324,7 +325,7 @@ public void registerSharedStates(SharedStateRegistry stateRegistry, long checkpo for (Map.Entry sharedStateHandle : sharedState.entrySet()) { SharedStateRegistryKey registryKey = - createSharedStateRegistryKeyFromFileName(sharedStateHandle.getKey()); +createSharedStateRegistryKey(sharedStateHandle.getValue()); Review Comment: I rather see it as a part of this ticket because it's directly related and one should NOT be merged without the other. Why would you implement this as a separate ticket? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] rkhachatryan commented on a diff in pull request #22669: [FLINK-29913] fix shared state be discarded by mistake when maxConcurrentCheckpoint>1
rkhachatryan commented on code in PR #22669: URL: https://github.com/apache/flink/pull/22669#discussion_r1212449390 ## flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandle.java: ## @@ -361,9 +362,10 @@ IncrementalRemoteKeyedStateHandle copy() { /** Create a unique key to register one of our shared state handles. */ @VisibleForTesting -public SharedStateRegistryKey createSharedStateRegistryKeyFromFileName(StateHandleID shId) { +public SharedStateRegistryKey createSharedStateRegistryKey(StreamStateHandle handle) { +String keyString = handle.getStreamStateHandleID().getKeyString(); return new SharedStateRegistryKey( -String.valueOf(backendIdentifier) + '-' + keyGroupRange, shId); + UUID.nameUUIDFromBytes(keyString.getBytes(StandardCharsets.UTF_8)).toString()); Review Comment: Yes, I think you're right. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] rkhachatryan commented on a diff in pull request #22669: [FLINK-29913] fix shared state be discarded by mistake when maxConcurrentCheckpoint>1
rkhachatryan commented on code in PR #22669: URL: https://github.com/apache/flink/pull/22669#discussion_r1212449200 ## flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandleTest.java: ## @@ -243,6 +244,60 @@ public void testNonEmptyIntersection() { assertEquals(handle.getStateHandleId(), newHandle.getStateHandleId()); } +@Test +public void testConcurrentCheckpointSharedStateRegistration() throws Exception { +StateHandleID handleID = new StateHandleID("1.sst"); +StreamStateHandle streamHandle1 = new ByteStreamStateHandle("file-1", new byte[] {'s'}); +StreamStateHandle streamHandle2 = new ByteStreamStateHandle("file-2", new byte[] {'s'}); + +SharedStateRegistry registry = new SharedStateRegistryImpl(); + +UUID backendID = UUID.randomUUID(); + +IncrementalRemoteKeyedStateHandle handle1 = +new IncrementalRemoteKeyedStateHandle( +backendID, +KeyGroupRange.of(0, 0), +1L, +placeSpies( +new HashMap() { +{ +put(handleID, streamHandle1); +} +}), Review Comment: Fair enough. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] rkhachatryan commented on a diff in pull request #22669: [FLINK-29913] fix shared state be discarded by mistake when maxConcurrentCheckpoint>1
rkhachatryan commented on code in PR #22669: URL: https://github.com/apache/flink/pull/22669#discussion_r1212443717 ## flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java: ## @@ -395,18 +394,16 @@ public void release() { /** Previous snapshot with uploaded sst files. */ protected static class PreviousSnapshot { -@Nullable private final Map confirmedSstFiles; +@Nullable private final Map confirmedSstFiles; -protected PreviousSnapshot(@Nullable Map confirmedSstFiles) { +protected PreviousSnapshot( +@Nullable Map confirmedSstFiles) { this.confirmedSstFiles = confirmedSstFiles; } protected Optional getUploaded(StateHandleID stateHandleID) { if (confirmedSstFiles != null && confirmedSstFiles.containsKey(stateHandleID)) { -// we introduce a placeholder state handle, that is replaced with the -// original from the shared state registry (created from a previous checkpoint) -return Optional.of( -new PlaceholderStreamStateHandle(confirmedSstFiles.get(stateHandleID))); Review Comment: Thanks for the explanation @zoltar9264 My concern is exactly about additional overhead of data transmission (the purpose of placeholder is to reduce it). I'm not sure whether `PlaceholderStreamStateHandle` removal will not cause a regression. It depends not only on the state size, but also on the configuration (`state.storage.fs.memory-threshold`) and the actual size of the SST files. So I think it would be safer to keep it. I agree that that requires more code changes, but I think it worth it. The complexity added by `PlaceholderStreamStateHandle` doesn't seem very high. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] rkhachatryan commented on a diff in pull request #22669: [FLINK-29913] fix shared state be discarded by mistake when maxConcurrentCheckpoint>1
rkhachatryan commented on code in PR #22669: URL: https://github.com/apache/flink/pull/22669#discussion_r1210462542 ## flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandleTest.java: ## @@ -243,6 +244,60 @@ public void testNonEmptyIntersection() { assertEquals(handle.getStateHandleId(), newHandle.getStateHandleId()); } +@Test +public void testConcurrentCheckpointSharedStateRegistration() throws Exception { +StateHandleID handleID = new StateHandleID("1.sst"); +StreamStateHandle streamHandle1 = new ByteStreamStateHandle("file-1", new byte[] {'s'}); +StreamStateHandle streamHandle2 = new ByteStreamStateHandle("file-2", new byte[] {'s'}); + +SharedStateRegistry registry = new SharedStateRegistryImpl(); + +UUID backendID = UUID.randomUUID(); + +IncrementalRemoteKeyedStateHandle handle1 = +new IncrementalRemoteKeyedStateHandle( +backendID, +KeyGroupRange.of(0, 0), +1L, +placeSpies( +new HashMap() { +{ +put(handleID, streamHandle1); +} +}), Review Comment: `Collections.singletonMap`? ditto: line 279 ## flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java: ## @@ -395,18 +394,16 @@ public void release() { /** Previous snapshot with uploaded sst files. */ protected static class PreviousSnapshot { -@Nullable private final Map confirmedSstFiles; +@Nullable private final Map confirmedSstFiles; -protected PreviousSnapshot(@Nullable Map confirmedSstFiles) { +protected PreviousSnapshot( +@Nullable Map confirmedSstFiles) { this.confirmedSstFiles = confirmedSstFiles; } protected Optional getUploaded(StateHandleID stateHandleID) { if (confirmedSstFiles != null && confirmedSstFiles.containsKey(stateHandleID)) { -// we introduce a placeholder state handle, that is replaced with the -// original from the shared state registry (created from a previous checkpoint) -return Optional.of( -new PlaceholderStreamStateHandle(confirmedSstFiles.get(stateHandleID))); Review Comment: Could you elaborate why you had to delete `PlaceholderStreamStateHandle`? I'm concerned that with this change, `ByteStreamStateHandle` will always be sent always to the JM (regardless of whether they were previously sent or not). ## flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandleTest.java: ## @@ -243,6 +244,60 @@ public void testNonEmptyIntersection() { assertEquals(handle.getStateHandleId(), newHandle.getStateHandleId()); } +@Test +public void testConcurrentCheckpointSharedStateRegistration() throws Exception { +StateHandleID handleID = new StateHandleID("1.sst"); +StreamStateHandle streamHandle1 = new ByteStreamStateHandle("file-1", new byte[] {'s'}); +StreamStateHandle streamHandle2 = new ByteStreamStateHandle("file-2", new byte[] {'s'}); + +SharedStateRegistry registry = new SharedStateRegistryImpl(); + +UUID backendID = UUID.randomUUID(); + +IncrementalRemoteKeyedStateHandle handle1 = +new IncrementalRemoteKeyedStateHandle( +backendID, +KeyGroupRange.of(0, 0), +1L, +placeSpies( +new HashMap() { +{ +put(handleID, streamHandle1); +} +}), +Collections.emptyMap(), +new ByteStreamStateHandle("", new byte[] {'s'})); + +handle1.registerSharedStates(registry, handle1.getCheckpointId()); + +IncrementalRemoteKeyedStateHandle handle2 = +new IncrementalRemoteKeyedStateHandle( +backendID, +KeyGroupRange.of(0, 0), +2L, +placeSpies( +new HashMap() { +{ +put(handleID, streamHandle2); +} +}), +Collections.emptyMap(), +new ByteStreamStateHandle("", new byte[] {'s'})); + +handle2.registerSharedStates(registry,