[GitHub] flink pull request #5578: [FLINK-8777][state]Improve resource release for lo...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5578 ---
[GitHub] flink pull request #5578: [FLINK-8777][state]Improve resource release for lo...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/5578#discussion_r171181196 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java --- @@ -90,6 +92,9 @@ @GuardedBy("lock") private boolean disposed; + /** Whether to discard the useless state when retrieve local checkpoint state. */ + private boolean retrieveWithDiscard = true; --- End diff -- Agreed and I prefer the second option. ---
[GitHub] flink pull request #5578: [FLINK-8777][state]Improve resource release for lo...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5578#discussion_r171179946 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java --- @@ -90,6 +92,9 @@ @GuardedBy("lock") private boolean disposed; + /** Whether to discard the useless state when retrieve local checkpoint state. */ + private boolean retrieveWithDiscard = true; --- End diff -- Then there are two better options in my opinion, because the flag is pure boilerplate: - Change the test to check what we are doing now, because that is what happens in the real use-case. - Maybe even better: split the method `retrieveLocalState` further: one method for pruning, one package-private method that does all the pure retrieval, logging, and `null` transformation. In the old `retrieveLocalState`, do the cleanup first, then the pure retrieval/logging. Call the package private method in the test. Maybe the test should then also just do both? ---
[GitHub] flink pull request #5578: [FLINK-8777][state]Improve resource release for lo...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/5578#discussion_r171178975 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java --- @@ -300,6 +295,34 @@ private void deleteDirectory(File directory) throws IOException { } } + /** +* Pruning the useless checkpoints, it should be called only when holding the {@link #lock}. +*/ + private void pruneCheckpoints(Predicate pruningChecker, boolean breakOnceCheckerFalse) { --- End diff -- ð I prefer to use `LongPredicate`, addressing ... ---
[GitHub] flink pull request #5578: [FLINK-8777][state]Improve resource release for lo...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5578#discussion_r171177343 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java --- @@ -300,6 +295,34 @@ private void deleteDirectory(File directory) throws IOException { } } + /** +* Pruning the useless checkpoints, it should be called only when holding the {@link #lock}. +*/ + private void pruneCheckpoints(Predicate pruningChecker, boolean breakOnceCheckerFalse) { --- End diff -- We can use `LongPredicate` instead of `Predicate` ---
[GitHub] flink pull request #5578: [FLINK-8777][state]Improve resource release for lo...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5578#discussion_r171177186 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java --- @@ -300,6 +291,32 @@ private void deleteDirectory(File directory) throws IOException { } } + /** +* Pruning the useless checkpoints. +*/ + private void pruneCheckpoints(long checkpointID, boolean breakTheIteration) { + + Iterator> entryIterator = + storedTaskStateByCheckpointID.entrySet().iterator(); + + final List> toRemove = new ArrayList<>(); + + while (entryIterator.hasNext()) { + + Map.Entry snapshotEntry = entryIterator.next(); + long entryCheckpointId = snapshotEntry.getKey(); + + if (entryCheckpointId != checkpointID) { --- End diff -- That is fine, from my point of view that is just one way of making the `if` more complex. ---
[GitHub] flink pull request #5578: [FLINK-8777][state]Improve resource release for lo...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/5578#discussion_r171133066 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java --- @@ -300,6 +291,32 @@ private void deleteDirectory(File directory) throws IOException { } } + /** +* Pruning the useless checkpoints. +*/ + private void pruneCheckpoints(long checkpointID, boolean breakTheIteration) { + + Iterator> entryIterator = + storedTaskStateByCheckpointID.entrySet().iterator(); + + final List> toRemove = new ArrayList<>(); + + while (entryIterator.hasNext()) { + + Map.Entry snapshotEntry = entryIterator.next(); + long entryCheckpointId = snapshotEntry.getKey(); + + if (entryCheckpointId != checkpointID) { --- End diff -- I agree with you that the breaking case looks a bit dangerous ... I think maybe we could pass a `Predicate` for the `if` and let the caller side pass the `Predicate` into this function. This could make it cleaner from the caller side and don't need to mass the logic into the `if` to make it complex. ---
[GitHub] flink pull request #5578: [FLINK-8777][state]Improve resource release for lo...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/5578#discussion_r171130767 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java --- @@ -159,6 +166,11 @@ public TaskStateSnapshot retrieveLocalState(long checkpointID) { TaskStateSnapshot snapshot; synchronized (lock) { snapshot = storedTaskStateByCheckpointID.get(checkpointID); + + if (retrieveWithDiscard) { + // Only the TaskStateSnapshot.checkpointID == checkpointID is useful, we remove the others --- End diff -- ð ---
[GitHub] flink pull request #5578: [FLINK-8777][state]Improve resource release for lo...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/5578#discussion_r171130750 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java --- @@ -300,6 +291,32 @@ private void deleteDirectory(File directory) throws IOException { } } + /** +* Pruning the useless checkpoints. +*/ + private void pruneCheckpoints(long checkpointID, boolean breakTheIteration) { + --- End diff -- ð ---
[GitHub] flink pull request #5578: [FLINK-8777][state]Improve resource release for lo...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/5578#discussion_r171130718 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java --- @@ -90,6 +92,9 @@ @GuardedBy("lock") private boolean disposed; + /** Whether to discard the useless state when retrieve local checkpoint state. */ + private boolean retrieveWithDiscard = true; --- End diff -- Aha, this is just for passing the existing test case in `TaskLocalStateStoreImplTest` ... ```java private void checkStoredAsExpected(List history, int off, int len) throws Exception { for (int i = off; i < len; ++i) { TaskStateSnapshot expected = history.get(i); Assert.assertTrue(expected == taskLocalStateStore.retrieveLocalState(i)); Mockito.verify(expected, Mockito.never()).discardState(); } } ``` ---
[GitHub] flink pull request #5578: [FLINK-8777][state]Improve resource release for lo...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5578#discussion_r170975679 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java --- @@ -159,6 +166,11 @@ public TaskStateSnapshot retrieveLocalState(long checkpointID) { TaskStateSnapshot snapshot; synchronized (lock) { snapshot = storedTaskStateByCheckpointID.get(checkpointID); + + if (retrieveWithDiscard) { + // Only the TaskStateSnapshot.checkpointID == checkpointID is useful, we remove the others --- End diff -- Comment is no longer required. ---
[GitHub] flink pull request #5578: [FLINK-8777][state]Improve resource release for lo...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5578#discussion_r170975600 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java --- @@ -300,6 +291,32 @@ private void deleteDirectory(File directory) throws IOException { } } + /** +* Pruning the useless checkpoints. +*/ + private void pruneCheckpoints(long checkpointID, boolean breakTheIteration) { + --- End diff -- I suggest to add an assert that the thread holds `lock` and document that this method should be called only when holding the lock. ---
[GitHub] flink pull request #5578: [FLINK-8777][state]Improve resource release for lo...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5578#discussion_r170975985 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java --- @@ -90,6 +92,9 @@ @GuardedBy("lock") private boolean disposed; + /** Whether to discard the useless state when retrieve local checkpoint state. */ + private boolean retrieveWithDiscard = true; --- End diff -- Why do we need this? Is there any case for not doing the cleanup? ---
[GitHub] flink pull request #5578: [FLINK-8777][state]Improve resource release for lo...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5578#discussion_r170982734 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java --- @@ -300,6 +291,32 @@ private void deleteDirectory(File directory) throws IOException { } } + /** +* Pruning the useless checkpoints. +*/ + private void pruneCheckpoints(long checkpointID, boolean breakTheIteration) { + + Iterator> entryIterator = + storedTaskStateByCheckpointID.entrySet().iterator(); + + final List> toRemove = new ArrayList<>(); + + while (entryIterator.hasNext()) { + + Map.Entry snapshotEntry = entryIterator.next(); + long entryCheckpointId = snapshotEntry.getKey(); + + if (entryCheckpointId != checkpointID) { --- End diff -- After a second though, while I think this code is currently correct, the case with breaking looks a bit dangerous. Potentially, if the checkpoint id is not there, this would not stop and prune ongoing checkpoints. I wonder if we should make the `if` a bit more complex, but safer (checking that the breaking case never exceeds the checkpoint id). What do you think? ---
[GitHub] flink pull request #5578: [FLINK-8777][state]Improve resource release for lo...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/5578#discussion_r170965807 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java --- @@ -90,6 +92,10 @@ @GuardedBy("lock") private boolean disposed; + /** Whether to discard the useless state when retrieve local checkpoint state. */ + @Nonnull + private boolean retrieveWithDiscard; --- End diff -- addressed. ---
[GitHub] flink pull request #5578: [FLINK-8777][state]Improve resource release for lo...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/5578#discussion_r170965699 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java --- @@ -285,4 +316,9 @@ public String toString() { ", localRecoveryConfig=" + localRecoveryConfig + '}'; } + + @VisibleForTesting + void setRetrieveWithDiscard(@Nonnull boolean retrieveWithDiscard) { --- End diff -- addressed. ---
[GitHub] flink pull request #5578: [FLINK-8777][state]Improve resource release for lo...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/5578#discussion_r170965755 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java --- @@ -126,30 +133,54 @@ public void storeLocalState( LOG.info("Storing local state for checkpoint {}.", checkpointId); LOG.debug("Local state for checkpoint {} is {}.", checkpointId, localState); - Map toDiscard = new HashMap<>(16); + Map.Entry toDiscard = null; synchronized (lock) { if (disposed) { // we ignore late stores and simply discard the state. - toDiscard.put(checkpointId, localState); + toDiscard = new AbstractMap.SimpleEntry(checkpointId, localState); } else { TaskStateSnapshot previous = storedTaskStateByCheckpointID.put(checkpointId, localState); if (previous != null) { - toDiscard.put(checkpointId, previous); + toDiscard = new AbstractMap.SimpleEntry(checkpointId, previous); } } } - asyncDiscardLocalStateForCollection(toDiscard.entrySet()); + if (toDiscard != null) { + asyncDiscardLocalStateForCollection(Collections.singletonList(toDiscard)); + } } @Override @Nullable public TaskStateSnapshot retrieveLocalState(long checkpointID) { synchronized (lock) { TaskStateSnapshot snapshot = storedTaskStateByCheckpointID.get(checkpointID); + + Iterator> entryIterator = + storedTaskStateByCheckpointID.entrySet().iterator(); + + if (retrieveWithDiscard) { + // Only the TaskStateSnapshot.checkpointID == checkpointID is useful, we remove the others + final List> toRemove = new ArrayList<>(); --- End diff -- ð addressed. ---
[GitHub] flink pull request #5578: [FLINK-8777][state]Improve resource release for lo...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/5578#discussion_r170965682 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java --- @@ -126,30 +133,54 @@ public void storeLocalState( LOG.info("Storing local state for checkpoint {}.", checkpointId); LOG.debug("Local state for checkpoint {} is {}.", checkpointId, localState); - Map toDiscard = new HashMap<>(16); + Map.Entry toDiscard = null; synchronized (lock) { if (disposed) { // we ignore late stores and simply discard the state. - toDiscard.put(checkpointId, localState); + toDiscard = new AbstractMap.SimpleEntry(checkpointId, localState); } else { TaskStateSnapshot previous = storedTaskStateByCheckpointID.put(checkpointId, localState); if (previous != null) { - toDiscard.put(checkpointId, previous); + toDiscard = new AbstractMap.SimpleEntry(checkpointId, previous); } } } - asyncDiscardLocalStateForCollection(toDiscard.entrySet()); + if (toDiscard != null) { + asyncDiscardLocalStateForCollection(Collections.singletonList(toDiscard)); + } } @Override @Nullable public TaskStateSnapshot retrieveLocalState(long checkpointID) { synchronized (lock) { TaskStateSnapshot snapshot = storedTaskStateByCheckpointID.get(checkpointID); + + Iterator> entryIterator = --- End diff -- addressed. ---
[GitHub] flink pull request #5578: [FLINK-8777][state]Improve resource release for lo...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/5578#discussion_r170965652 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java --- @@ -90,6 +92,10 @@ @GuardedBy("lock") private boolean disposed; + /** Whether to discard the useless state when retrieve local checkpoint state. */ + @Nonnull --- End diff -- addressed. ---
[GitHub] flink pull request #5578: [FLINK-8777][state]Improve resource release for lo...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5578#discussion_r170943110 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java --- @@ -90,6 +92,10 @@ @GuardedBy("lock") private boolean disposed; + /** Whether to discard the useless state when retrieve local checkpoint state. */ + @Nonnull --- End diff -- This annotation does not fit here. ---
[GitHub] flink pull request #5578: [FLINK-8777][state]Improve resource release for lo...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5578#discussion_r170947390 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java --- @@ -126,30 +133,54 @@ public void storeLocalState( LOG.info("Storing local state for checkpoint {}.", checkpointId); LOG.debug("Local state for checkpoint {} is {}.", checkpointId, localState); - Map toDiscard = new HashMap<>(16); + Map.Entry toDiscard = null; synchronized (lock) { if (disposed) { // we ignore late stores and simply discard the state. - toDiscard.put(checkpointId, localState); + toDiscard = new AbstractMap.SimpleEntry(checkpointId, localState); } else { TaskStateSnapshot previous = storedTaskStateByCheckpointID.put(checkpointId, localState); if (previous != null) { - toDiscard.put(checkpointId, previous); + toDiscard = new AbstractMap.SimpleEntry(checkpointId, previous); } } } - asyncDiscardLocalStateForCollection(toDiscard.entrySet()); + if (toDiscard != null) { + asyncDiscardLocalStateForCollection(Collections.singletonList(toDiscard)); + } } @Override @Nullable public TaskStateSnapshot retrieveLocalState(long checkpointID) { synchronized (lock) { TaskStateSnapshot snapshot = storedTaskStateByCheckpointID.get(checkpointID); + + Iterator> entryIterator = --- End diff -- I would move all the cleanup logic in a separate method that is just invoked here to separate the concerns. ---
[GitHub] flink pull request #5578: [FLINK-8777][state]Improve resource release for lo...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5578#discussion_r170953600 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java --- @@ -126,30 +133,54 @@ public void storeLocalState( LOG.info("Storing local state for checkpoint {}.", checkpointId); LOG.debug("Local state for checkpoint {} is {}.", checkpointId, localState); - Map toDiscard = new HashMap<>(16); + Map.Entry toDiscard = null; synchronized (lock) { if (disposed) { // we ignore late stores and simply discard the state. - toDiscard.put(checkpointId, localState); + toDiscard = new AbstractMap.SimpleEntry(checkpointId, localState); } else { TaskStateSnapshot previous = storedTaskStateByCheckpointID.put(checkpointId, localState); if (previous != null) { - toDiscard.put(checkpointId, previous); + toDiscard = new AbstractMap.SimpleEntry(checkpointId, previous); } } } - asyncDiscardLocalStateForCollection(toDiscard.entrySet()); + if (toDiscard != null) { + asyncDiscardLocalStateForCollection(Collections.singletonList(toDiscard)); + } } @Override @Nullable public TaskStateSnapshot retrieveLocalState(long checkpointID) { synchronized (lock) { TaskStateSnapshot snapshot = storedTaskStateByCheckpointID.get(checkpointID); + + Iterator> entryIterator = + storedTaskStateByCheckpointID.entrySet().iterator(); + + if (retrieveWithDiscard) { + // Only the TaskStateSnapshot.checkpointID == checkpointID is useful, we remove the others + final List> toRemove = new ArrayList<>(); --- End diff -- I think we can de-duplicate code by extracting a method with the code from `confirmCheckpoint(...)`, maybe called `pruneCheckpoints(...)`. We can do the comparison for both use-cases as `entryCheckpointId != checkpointID` and have a boolean parameter which determines if we break the iteration in the `else` case or not. ---
[GitHub] flink pull request #5578: [FLINK-8777][state]Improve resource release for lo...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5578#discussion_r170948561 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java --- @@ -285,4 +316,9 @@ public String toString() { ", localRecoveryConfig=" + localRecoveryConfig + '}'; } + + @VisibleForTesting + void setRetrieveWithDiscard(@Nonnull boolean retrieveWithDiscard) { --- End diff -- remove `@Nonnull` ---
[GitHub] flink pull request #5578: [FLINK-8777][state]Improve resource release for lo...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5578#discussion_r170946385 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java --- @@ -90,6 +92,10 @@ @GuardedBy("lock") private boolean disposed; + /** Whether to discard the useless state when retrieve local checkpoint state. */ + @Nonnull + private boolean retrieveWithDiscard; --- End diff -- Why not make this a general default? ---