[GitHub] flink pull request #5578: [FLINK-8777][state]Improve resource release for lo...

2018-02-28 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5578


---


[GitHub] flink pull request #5578: [FLINK-8777][state]Improve resource release for lo...

2018-02-28 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5578#discussion_r171181196
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java
 ---
@@ -90,6 +92,9 @@
@GuardedBy("lock")
private boolean disposed;
 
+   /** Whether to discard the useless state when retrieve local checkpoint 
state. */
+   private boolean retrieveWithDiscard = true;
--- End diff --

Agreed and I prefer the second option.


---


[GitHub] flink pull request #5578: [FLINK-8777][state]Improve resource release for lo...

2018-02-28 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5578#discussion_r171179946
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java
 ---
@@ -90,6 +92,9 @@
@GuardedBy("lock")
private boolean disposed;
 
+   /** Whether to discard the useless state when retrieve local checkpoint 
state. */
+   private boolean retrieveWithDiscard = true;
--- End diff --

Then there are two better options in my opinion, because the flag is pure 
boilerplate:

- Change the test to check what we are doing now, because that is what 
happens in the real use-case.
- Maybe even better: split the method `retrieveLocalState` further: one 
method for pruning, one package-private method that does all the pure 
retrieval, logging, and `null` transformation. In the old `retrieveLocalState`, 
do the cleanup first, then the pure retrieval/logging. Call the package private 
method in the test.

Maybe the test should then also just do both?


---


[GitHub] flink pull request #5578: [FLINK-8777][state]Improve resource release for lo...

2018-02-28 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5578#discussion_r171178975
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java
 ---
@@ -300,6 +295,34 @@ private void deleteDirectory(File directory) throws 
IOException {
}
}
 
+   /**
+* Pruning the useless checkpoints, it should be called only when 
holding the {@link #lock}.
+*/
+   private void pruneCheckpoints(Predicate pruningChecker, boolean 
breakOnceCheckerFalse) {
--- End diff --

👍 I prefer to use `LongPredicate`, addressing ...


---


[GitHub] flink pull request #5578: [FLINK-8777][state]Improve resource release for lo...

2018-02-28 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5578#discussion_r171177343
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java
 ---
@@ -300,6 +295,34 @@ private void deleteDirectory(File directory) throws 
IOException {
}
}
 
+   /**
+* Pruning the useless checkpoints, it should be called only when 
holding the {@link #lock}.
+*/
+   private void pruneCheckpoints(Predicate pruningChecker, boolean 
breakOnceCheckerFalse) {
--- End diff --

We can use `LongPredicate` instead of `Predicate`


---


[GitHub] flink pull request #5578: [FLINK-8777][state]Improve resource release for lo...

2018-02-28 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5578#discussion_r171177186
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java
 ---
@@ -300,6 +291,32 @@ private void deleteDirectory(File directory) throws 
IOException {
}
}
 
+   /**
+* Pruning the useless checkpoints.
+*/
+   private void pruneCheckpoints(long checkpointID, boolean 
breakTheIteration) {
+
+   Iterator> entryIterator =
+   storedTaskStateByCheckpointID.entrySet().iterator();
+
+   final List> toRemove = new 
ArrayList<>();
+
+   while (entryIterator.hasNext()) {
+
+   Map.Entry snapshotEntry = 
entryIterator.next();
+   long entryCheckpointId = snapshotEntry.getKey();
+
+   if (entryCheckpointId != checkpointID) {
--- End diff --

That is fine, from my point of view that is just one way of making the `if` 
more complex.


---


[GitHub] flink pull request #5578: [FLINK-8777][state]Improve resource release for lo...

2018-02-27 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5578#discussion_r171133066
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java
 ---
@@ -300,6 +291,32 @@ private void deleteDirectory(File directory) throws 
IOException {
}
}
 
+   /**
+* Pruning the useless checkpoints.
+*/
+   private void pruneCheckpoints(long checkpointID, boolean 
breakTheIteration) {
+
+   Iterator> entryIterator =
+   storedTaskStateByCheckpointID.entrySet().iterator();
+
+   final List> toRemove = new 
ArrayList<>();
+
+   while (entryIterator.hasNext()) {
+
+   Map.Entry snapshotEntry = 
entryIterator.next();
+   long entryCheckpointId = snapshotEntry.getKey();
+
+   if (entryCheckpointId != checkpointID) {
--- End diff --

I agree with you that the breaking case looks a bit dangerous ... I think 
maybe we could pass a `Predicate` for the `if` and let the caller side pass the 
`Predicate` into this function. This could make it cleaner from the caller side 
and don't need to mass the logic into the `if` to make it complex.


---


[GitHub] flink pull request #5578: [FLINK-8777][state]Improve resource release for lo...

2018-02-27 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5578#discussion_r171130767
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java
 ---
@@ -159,6 +166,11 @@ public TaskStateSnapshot retrieveLocalState(long 
checkpointID) {
TaskStateSnapshot snapshot;
synchronized (lock) {
snapshot = 
storedTaskStateByCheckpointID.get(checkpointID);
+
+   if (retrieveWithDiscard) {
+   // Only the TaskStateSnapshot.checkpointID == 
checkpointID is useful, we remove the others
--- End diff --

👍 


---


[GitHub] flink pull request #5578: [FLINK-8777][state]Improve resource release for lo...

2018-02-27 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5578#discussion_r171130750
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java
 ---
@@ -300,6 +291,32 @@ private void deleteDirectory(File directory) throws 
IOException {
}
}
 
+   /**
+* Pruning the useless checkpoints.
+*/
+   private void pruneCheckpoints(long checkpointID, boolean 
breakTheIteration) {
+
--- End diff --

👍 


---


[GitHub] flink pull request #5578: [FLINK-8777][state]Improve resource release for lo...

2018-02-27 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5578#discussion_r171130718
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java
 ---
@@ -90,6 +92,9 @@
@GuardedBy("lock")
private boolean disposed;
 
+   /** Whether to discard the useless state when retrieve local checkpoint 
state. */
+   private boolean retrieveWithDiscard = true;
--- End diff --

Aha, this is just for passing the existing test case in 
`TaskLocalStateStoreImplTest` ... 
```java
private void checkStoredAsExpected(List history, int 
off, int len) throws Exception {
for (int i = off; i < len; ++i) {
TaskStateSnapshot expected = history.get(i);
Assert.assertTrue(expected == 
taskLocalStateStore.retrieveLocalState(i));
Mockito.verify(expected, 
Mockito.never()).discardState();
}
}
```


---


[GitHub] flink pull request #5578: [FLINK-8777][state]Improve resource release for lo...

2018-02-27 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5578#discussion_r170975679
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java
 ---
@@ -159,6 +166,11 @@ public TaskStateSnapshot retrieveLocalState(long 
checkpointID) {
TaskStateSnapshot snapshot;
synchronized (lock) {
snapshot = 
storedTaskStateByCheckpointID.get(checkpointID);
+
+   if (retrieveWithDiscard) {
+   // Only the TaskStateSnapshot.checkpointID == 
checkpointID is useful, we remove the others
--- End diff --

Comment is no longer required.


---


[GitHub] flink pull request #5578: [FLINK-8777][state]Improve resource release for lo...

2018-02-27 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5578#discussion_r170975600
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java
 ---
@@ -300,6 +291,32 @@ private void deleteDirectory(File directory) throws 
IOException {
}
}
 
+   /**
+* Pruning the useless checkpoints.
+*/
+   private void pruneCheckpoints(long checkpointID, boolean 
breakTheIteration) {
+
--- End diff --

I suggest to add an assert that the thread holds `lock` and document that 
this method should be called only when holding the lock.


---


[GitHub] flink pull request #5578: [FLINK-8777][state]Improve resource release for lo...

2018-02-27 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5578#discussion_r170975985
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java
 ---
@@ -90,6 +92,9 @@
@GuardedBy("lock")
private boolean disposed;
 
+   /** Whether to discard the useless state when retrieve local checkpoint 
state. */
+   private boolean retrieveWithDiscard = true;
--- End diff --

Why do we need this? Is there any case for not doing the cleanup?


---


[GitHub] flink pull request #5578: [FLINK-8777][state]Improve resource release for lo...

2018-02-27 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5578#discussion_r170982734
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java
 ---
@@ -300,6 +291,32 @@ private void deleteDirectory(File directory) throws 
IOException {
}
}
 
+   /**
+* Pruning the useless checkpoints.
+*/
+   private void pruneCheckpoints(long checkpointID, boolean 
breakTheIteration) {
+
+   Iterator> entryIterator =
+   storedTaskStateByCheckpointID.entrySet().iterator();
+
+   final List> toRemove = new 
ArrayList<>();
+
+   while (entryIterator.hasNext()) {
+
+   Map.Entry snapshotEntry = 
entryIterator.next();
+   long entryCheckpointId = snapshotEntry.getKey();
+
+   if (entryCheckpointId != checkpointID) {
--- End diff --

After a second though, while I think this code is currently correct, the 
case with breaking looks a bit dangerous. Potentially, if the checkpoint id is 
not there, this would not stop and prune ongoing checkpoints. I wonder if we 
should make the `if` a bit more complex, but safer (checking that the breaking 
case never exceeds the checkpoint id). What do you think?


---


[GitHub] flink pull request #5578: [FLINK-8777][state]Improve resource release for lo...

2018-02-27 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5578#discussion_r170965807
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java
 ---
@@ -90,6 +92,10 @@
@GuardedBy("lock")
private boolean disposed;
 
+   /** Whether to discard the useless state when retrieve local checkpoint 
state. */
+   @Nonnull
+   private boolean retrieveWithDiscard;
--- End diff --

addressed.


---


[GitHub] flink pull request #5578: [FLINK-8777][state]Improve resource release for lo...

2018-02-27 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5578#discussion_r170965699
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java
 ---
@@ -285,4 +316,9 @@ public String toString() {
", localRecoveryConfig=" + localRecoveryConfig +
'}';
}
+
+   @VisibleForTesting
+   void setRetrieveWithDiscard(@Nonnull boolean retrieveWithDiscard) {
--- End diff --

addressed.


---


[GitHub] flink pull request #5578: [FLINK-8777][state]Improve resource release for lo...

2018-02-27 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5578#discussion_r170965755
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java
 ---
@@ -126,30 +133,54 @@ public void storeLocalState(
LOG.info("Storing local state for checkpoint {}.", 
checkpointId);
LOG.debug("Local state for checkpoint {} is {}.", checkpointId, 
localState);
 
-   Map toDiscard = new HashMap<>(16);
+   Map.Entry toDiscard = null;
 
synchronized (lock) {
if (disposed) {
// we ignore late stores and simply discard the 
state.
-   toDiscard.put(checkpointId, localState);
+   toDiscard = new AbstractMap.SimpleEntry(checkpointId, localState);
} else {
TaskStateSnapshot previous =

storedTaskStateByCheckpointID.put(checkpointId, localState);
 
if (previous != null) {
-   toDiscard.put(checkpointId, previous);
+   toDiscard = new 
AbstractMap.SimpleEntry(checkpointId, previous);
}
}
}
 
-   asyncDiscardLocalStateForCollection(toDiscard.entrySet());
+   if (toDiscard != null) {
+   
asyncDiscardLocalStateForCollection(Collections.singletonList(toDiscard));
+   }
}
 
@Override
@Nullable
public TaskStateSnapshot retrieveLocalState(long checkpointID) {
synchronized (lock) {
TaskStateSnapshot snapshot = 
storedTaskStateByCheckpointID.get(checkpointID);
+
+   Iterator> 
entryIterator =
+   
storedTaskStateByCheckpointID.entrySet().iterator();
+
+   if (retrieveWithDiscard) {
+   // Only the TaskStateSnapshot.checkpointID == 
checkpointID is useful, we remove the others
+   final List> 
toRemove = new ArrayList<>();
--- End diff --

👍 addressed.


---


[GitHub] flink pull request #5578: [FLINK-8777][state]Improve resource release for lo...

2018-02-27 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5578#discussion_r170965682
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java
 ---
@@ -126,30 +133,54 @@ public void storeLocalState(
LOG.info("Storing local state for checkpoint {}.", 
checkpointId);
LOG.debug("Local state for checkpoint {} is {}.", checkpointId, 
localState);
 
-   Map toDiscard = new HashMap<>(16);
+   Map.Entry toDiscard = null;
 
synchronized (lock) {
if (disposed) {
// we ignore late stores and simply discard the 
state.
-   toDiscard.put(checkpointId, localState);
+   toDiscard = new AbstractMap.SimpleEntry(checkpointId, localState);
} else {
TaskStateSnapshot previous =

storedTaskStateByCheckpointID.put(checkpointId, localState);
 
if (previous != null) {
-   toDiscard.put(checkpointId, previous);
+   toDiscard = new 
AbstractMap.SimpleEntry(checkpointId, previous);
}
}
}
 
-   asyncDiscardLocalStateForCollection(toDiscard.entrySet());
+   if (toDiscard != null) {
+   
asyncDiscardLocalStateForCollection(Collections.singletonList(toDiscard));
+   }
}
 
@Override
@Nullable
public TaskStateSnapshot retrieveLocalState(long checkpointID) {
synchronized (lock) {
TaskStateSnapshot snapshot = 
storedTaskStateByCheckpointID.get(checkpointID);
+
+   Iterator> 
entryIterator =
--- End diff --

addressed.


---


[GitHub] flink pull request #5578: [FLINK-8777][state]Improve resource release for lo...

2018-02-27 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5578#discussion_r170965652
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java
 ---
@@ -90,6 +92,10 @@
@GuardedBy("lock")
private boolean disposed;
 
+   /** Whether to discard the useless state when retrieve local checkpoint 
state. */
+   @Nonnull
--- End diff --

addressed.


---


[GitHub] flink pull request #5578: [FLINK-8777][state]Improve resource release for lo...

2018-02-27 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5578#discussion_r170943110
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java
 ---
@@ -90,6 +92,10 @@
@GuardedBy("lock")
private boolean disposed;
 
+   /** Whether to discard the useless state when retrieve local checkpoint 
state. */
+   @Nonnull
--- End diff --

This annotation does not fit here.


---


[GitHub] flink pull request #5578: [FLINK-8777][state]Improve resource release for lo...

2018-02-27 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5578#discussion_r170947390
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java
 ---
@@ -126,30 +133,54 @@ public void storeLocalState(
LOG.info("Storing local state for checkpoint {}.", 
checkpointId);
LOG.debug("Local state for checkpoint {} is {}.", checkpointId, 
localState);
 
-   Map toDiscard = new HashMap<>(16);
+   Map.Entry toDiscard = null;
 
synchronized (lock) {
if (disposed) {
// we ignore late stores and simply discard the 
state.
-   toDiscard.put(checkpointId, localState);
+   toDiscard = new AbstractMap.SimpleEntry(checkpointId, localState);
} else {
TaskStateSnapshot previous =

storedTaskStateByCheckpointID.put(checkpointId, localState);
 
if (previous != null) {
-   toDiscard.put(checkpointId, previous);
+   toDiscard = new 
AbstractMap.SimpleEntry(checkpointId, previous);
}
}
}
 
-   asyncDiscardLocalStateForCollection(toDiscard.entrySet());
+   if (toDiscard != null) {
+   
asyncDiscardLocalStateForCollection(Collections.singletonList(toDiscard));
+   }
}
 
@Override
@Nullable
public TaskStateSnapshot retrieveLocalState(long checkpointID) {
synchronized (lock) {
TaskStateSnapshot snapshot = 
storedTaskStateByCheckpointID.get(checkpointID);
+
+   Iterator> 
entryIterator =
--- End diff --

I would move all the cleanup logic in a separate method that is just 
invoked here to separate the concerns.


---


[GitHub] flink pull request #5578: [FLINK-8777][state]Improve resource release for lo...

2018-02-27 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5578#discussion_r170953600
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java
 ---
@@ -126,30 +133,54 @@ public void storeLocalState(
LOG.info("Storing local state for checkpoint {}.", 
checkpointId);
LOG.debug("Local state for checkpoint {} is {}.", checkpointId, 
localState);
 
-   Map toDiscard = new HashMap<>(16);
+   Map.Entry toDiscard = null;
 
synchronized (lock) {
if (disposed) {
// we ignore late stores and simply discard the 
state.
-   toDiscard.put(checkpointId, localState);
+   toDiscard = new AbstractMap.SimpleEntry(checkpointId, localState);
} else {
TaskStateSnapshot previous =

storedTaskStateByCheckpointID.put(checkpointId, localState);
 
if (previous != null) {
-   toDiscard.put(checkpointId, previous);
+   toDiscard = new 
AbstractMap.SimpleEntry(checkpointId, previous);
}
}
}
 
-   asyncDiscardLocalStateForCollection(toDiscard.entrySet());
+   if (toDiscard != null) {
+   
asyncDiscardLocalStateForCollection(Collections.singletonList(toDiscard));
+   }
}
 
@Override
@Nullable
public TaskStateSnapshot retrieveLocalState(long checkpointID) {
synchronized (lock) {
TaskStateSnapshot snapshot = 
storedTaskStateByCheckpointID.get(checkpointID);
+
+   Iterator> 
entryIterator =
+   
storedTaskStateByCheckpointID.entrySet().iterator();
+
+   if (retrieveWithDiscard) {
+   // Only the TaskStateSnapshot.checkpointID == 
checkpointID is useful, we remove the others
+   final List> 
toRemove = new ArrayList<>();
--- End diff --

I think we can de-duplicate code by extracting a method with the code from 
`confirmCheckpoint(...)`, maybe called `pruneCheckpoints(...)`. We can do the 
comparison for both use-cases as `entryCheckpointId != checkpointID` and have a 
boolean parameter which determines if we break the iteration in the `else` case 
or not.


---


[GitHub] flink pull request #5578: [FLINK-8777][state]Improve resource release for lo...

2018-02-27 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5578#discussion_r170948561
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java
 ---
@@ -285,4 +316,9 @@ public String toString() {
", localRecoveryConfig=" + localRecoveryConfig +
'}';
}
+
+   @VisibleForTesting
+   void setRetrieveWithDiscard(@Nonnull boolean retrieveWithDiscard) {
--- End diff --

remove `@Nonnull`


---


[GitHub] flink pull request #5578: [FLINK-8777][state]Improve resource release for lo...

2018-02-27 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5578#discussion_r170946385
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java
 ---
@@ -90,6 +92,10 @@
@GuardedBy("lock")
private boolean disposed;
 
+   /** Whether to discard the useless state when retrieve local checkpoint 
state. */
+   @Nonnull
+   private boolean retrieveWithDiscard;
--- End diff --

Why not make this a general default?


---