[GitHub] flink pull request #6257: [FLINK-9676][network] clarify contracts of BufferL...

2018-07-05 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6257


---


[GitHub] flink pull request #6257: [FLINK-9676][network] clarify contracts of BufferL...

2018-07-05 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/6257#discussion_r200287796
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
 ---
@@ -251,27 +257,56 @@ private MemorySegment requestMemorySegment(boolean 
isBlocking) throws Interrupte
 
@Override
public void recycle(MemorySegment segment) {
+   BufferListener listener;
synchronized (availableMemorySegments) {
if (isDestroyed || numberOfRequestedMemorySegments > 
currentPoolSize) {
returnMemorySegment(segment);
+   return;
}
else {
-   BufferListener listener = 
registeredListeners.poll();
+   listener = registeredListeners.poll();
 
if (listener == null) {
availableMemorySegments.add(segment);
availableMemorySegments.notify();
+   return;
}
-   else {
-   try {
-   boolean needMoreBuffers = 
listener.notifyBufferAvailable(new NetworkBuffer(segment, this));
-   if (needMoreBuffers) {
-   
registeredListeners.add(listener);
-   }
+   }
+   }
+
+   // We do not know which locks have been acquired before the 
recycle() or are needed in the
+   // notification and which other threads also access them.
+   // -> call notifyBufferAvailable() outside of the synchronized 
block to avoid a deadlock (FLINK-9676)
+   boolean success = false;
+   boolean needMoreBuffers = false;
+   try {
+   needMoreBuffers = listener.notifyBufferAvailable(new 
NetworkBuffer(segment, this));
+   success = true;
+   } catch (Throwable ignored) {
+   // handled below, under the lock
+   }
+
+   if (!success || needMoreBuffers) {
+   synchronized (availableMemorySegments) {
+   if (isDestroyed) {
+   // cleanup tasks how they would have 
been done if we only had one synchronized block
+   if (needMoreBuffers) {
+   
listener.notifyBufferDestroyed();
--- End diff --

👍


---


[GitHub] flink pull request #6257: [FLINK-9676][network] clarify contracts of BufferL...

2018-07-05 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/6257#discussion_r200287730
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
 ---
@@ -251,27 +257,56 @@ private MemorySegment requestMemorySegment(boolean 
isBlocking) throws Interrupte
 
@Override
public void recycle(MemorySegment segment) {
+   BufferListener listener;
synchronized (availableMemorySegments) {
if (isDestroyed || numberOfRequestedMemorySegments > 
currentPoolSize) {
returnMemorySegment(segment);
+   return;
}
else {
-   BufferListener listener = 
registeredListeners.poll();
+   listener = registeredListeners.poll();
 
if (listener == null) {
availableMemorySegments.add(segment);
availableMemorySegments.notify();
+   return;
}
-   else {
-   try {
-   boolean needMoreBuffers = 
listener.notifyBufferAvailable(new NetworkBuffer(segment, this));
-   if (needMoreBuffers) {
-   
registeredListeners.add(listener);
-   }
+   }
+   }
+
+   // We do not know which locks have been acquired before the 
recycle() or are needed in the
+   // notification and which other threads also access them.
+   // -> call notifyBufferAvailable() outside of the synchronized 
block to avoid a deadlock (FLINK-9676)
+   boolean success = false;
+   boolean needMoreBuffers = false;
+   try {
+   needMoreBuffers = listener.notifyBufferAvailable(new 
NetworkBuffer(segment, this));
+   success = true;
+   } catch (Throwable ignored) {
+   // handled below, under the lock
+   }
+
+   if (!success || needMoreBuffers) {
+   synchronized (availableMemorySegments) {
+   if (isDestroyed) {
+   // cleanup tasks how they would have 
been done if we only had one synchronized block
+   if (needMoreBuffers) {
+   
listener.notifyBufferDestroyed();
}
-   catch (Throwable ignored) {
-   
availableMemorySegments.add(segment);
-   
availableMemorySegments.notify();
--- End diff --

👍


---


[GitHub] flink pull request #6257: [FLINK-9676][network] clarify contracts of BufferL...

2018-07-05 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/6257#discussion_r200283214
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
 ---
@@ -251,27 +257,56 @@ private MemorySegment requestMemorySegment(boolean 
isBlocking) throws Interrupte
 
@Override
public void recycle(MemorySegment segment) {
+   BufferListener listener;
synchronized (availableMemorySegments) {
if (isDestroyed || numberOfRequestedMemorySegments > 
currentPoolSize) {
returnMemorySegment(segment);
+   return;
}
else {
-   BufferListener listener = 
registeredListeners.poll();
+   listener = registeredListeners.poll();
 
if (listener == null) {
availableMemorySegments.add(segment);
availableMemorySegments.notify();
+   return;
}
-   else {
-   try {
-   boolean needMoreBuffers = 
listener.notifyBufferAvailable(new NetworkBuffer(segment, this));
-   if (needMoreBuffers) {
-   
registeredListeners.add(listener);
-   }
+   }
+   }
+
+   // We do not know which locks have been acquired before the 
recycle() or are needed in the
+   // notification and which other threads also access them.
+   // -> call notifyBufferAvailable() outside of the synchronized 
block to avoid a deadlock (FLINK-9676)
+   boolean success = false;
+   boolean needMoreBuffers = false;
+   try {
+   needMoreBuffers = listener.notifyBufferAvailable(new 
NetworkBuffer(segment, this));
+   success = true;
+   } catch (Throwable ignored) {
+   // handled below, under the lock
+   }
+
+   if (!success || needMoreBuffers) {
+   synchronized (availableMemorySegments) {
+   if (isDestroyed) {
+   // cleanup tasks how they would have 
been done if we only had one synchronized block
+   if (needMoreBuffers) {
+   
listener.notifyBufferDestroyed();
--- End diff --

actually, let's do this in a follow-up PR


---


[GitHub] flink pull request #6257: [FLINK-9676][network] clarify contracts of BufferL...

2018-07-05 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/6257#discussion_r200274433
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
 ---
@@ -251,27 +257,56 @@ private MemorySegment requestMemorySegment(boolean 
isBlocking) throws Interrupte
 
@Override
public void recycle(MemorySegment segment) {
+   BufferListener listener;
synchronized (availableMemorySegments) {
if (isDestroyed || numberOfRequestedMemorySegments > 
currentPoolSize) {
returnMemorySegment(segment);
+   return;
}
else {
-   BufferListener listener = 
registeredListeners.poll();
+   listener = registeredListeners.poll();
 
if (listener == null) {
availableMemorySegments.add(segment);
availableMemorySegments.notify();
+   return;
}
-   else {
-   try {
-   boolean needMoreBuffers = 
listener.notifyBufferAvailable(new NetworkBuffer(segment, this));
-   if (needMoreBuffers) {
-   
registeredListeners.add(listener);
-   }
+   }
+   }
+
+   // We do not know which locks have been acquired before the 
recycle() or are needed in the
+   // notification and which other threads also access them.
+   // -> call notifyBufferAvailable() outside of the synchronized 
block to avoid a deadlock (FLINK-9676)
+   boolean success = false;
+   boolean needMoreBuffers = false;
+   try {
+   needMoreBuffers = listener.notifyBufferAvailable(new 
NetworkBuffer(segment, this));
+   success = true;
+   } catch (Throwable ignored) {
+   // handled below, under the lock
+   }
+
+   if (!success || needMoreBuffers) {
+   synchronized (availableMemorySegments) {
+   if (isDestroyed) {
+   // cleanup tasks how they would have 
been done if we only had one synchronized block
+   if (needMoreBuffers) {
+   
listener.notifyBufferDestroyed();
--- End diff --

true, this ~could~ should be done outside the lock as well


---


[GitHub] flink pull request #6257: [FLINK-9676][network] clarify contracts of BufferL...

2018-07-05 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/6257#discussion_r200274038
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
 ---
@@ -251,27 +257,56 @@ private MemorySegment requestMemorySegment(boolean 
isBlocking) throws Interrupte
 
@Override
public void recycle(MemorySegment segment) {
+   BufferListener listener;
synchronized (availableMemorySegments) {
if (isDestroyed || numberOfRequestedMemorySegments > 
currentPoolSize) {
returnMemorySegment(segment);
+   return;
}
else {
-   BufferListener listener = 
registeredListeners.poll();
+   listener = registeredListeners.poll();
 
if (listener == null) {
availableMemorySegments.add(segment);
availableMemorySegments.notify();
+   return;
}
-   else {
-   try {
-   boolean needMoreBuffers = 
listener.notifyBufferAvailable(new NetworkBuffer(segment, this));
-   if (needMoreBuffers) {
-   
registeredListeners.add(listener);
-   }
+   }
+   }
+
+   // We do not know which locks have been acquired before the 
recycle() or are needed in the
+   // notification and which other threads also access them.
+   // -> call notifyBufferAvailable() outside of the synchronized 
block to avoid a deadlock (FLINK-9676)
+   boolean success = false;
+   boolean needMoreBuffers = false;
+   try {
+   needMoreBuffers = listener.notifyBufferAvailable(new 
NetworkBuffer(segment, this));
+   success = true;
+   } catch (Throwable ignored) {
+   // handled below, under the lock
+   }
+
+   if (!success || needMoreBuffers) {
+   synchronized (availableMemorySegments) {
+   if (isDestroyed) {
+   // cleanup tasks how they would have 
been done if we only had one synchronized block
+   if (needMoreBuffers) {
+   
listener.notifyBufferDestroyed();
}
-   catch (Throwable ignored) {
-   
availableMemorySegments.add(segment);
-   
availableMemorySegments.notify();
--- End diff --

true, that's why I created FLINK-9755 for this issue and already have code 
(have to add tests though) - expect a PR soon


---


[GitHub] flink pull request #6257: [FLINK-9676][network] clarify contracts of BufferL...

2018-07-05 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/6257#discussion_r200252253
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
 ---
@@ -251,27 +257,56 @@ private MemorySegment requestMemorySegment(boolean 
isBlocking) throws Interrupte
 
@Override
public void recycle(MemorySegment segment) {
+   BufferListener listener;
synchronized (availableMemorySegments) {
if (isDestroyed || numberOfRequestedMemorySegments > 
currentPoolSize) {
returnMemorySegment(segment);
+   return;
}
else {
-   BufferListener listener = 
registeredListeners.poll();
+   listener = registeredListeners.poll();
 
if (listener == null) {
availableMemorySegments.add(segment);
availableMemorySegments.notify();
+   return;
}
-   else {
-   try {
-   boolean needMoreBuffers = 
listener.notifyBufferAvailable(new NetworkBuffer(segment, this));
-   if (needMoreBuffers) {
-   
registeredListeners.add(listener);
-   }
+   }
+   }
+
+   // We do not know which locks have been acquired before the 
recycle() or are needed in the
+   // notification and which other threads also access them.
+   // -> call notifyBufferAvailable() outside of the synchronized 
block to avoid a deadlock (FLINK-9676)
+   boolean success = false;
+   boolean needMoreBuffers = false;
+   try {
+   needMoreBuffers = listener.notifyBufferAvailable(new 
NetworkBuffer(segment, this));
+   success = true;
+   } catch (Throwable ignored) {
+   // handled below, under the lock
+   }
+
+   if (!success || needMoreBuffers) {
+   synchronized (availableMemorySegments) {
+   if (isDestroyed) {
+   // cleanup tasks how they would have 
been done if we only had one synchronized block
+   if (needMoreBuffers) {
+   
listener.notifyBufferDestroyed();
}
-   catch (Throwable ignored) {
-   
availableMemorySegments.add(segment);
-   
availableMemorySegments.notify();
--- End diff --

I am wondering whether we should rethrow this exception under below 
handling in the end.

For example: During `RemoteInputChannel#notifyBufferAvailable`, if the tag 
of  `isWaitingForFloatingBuffers` is not consistent, we should throw this 
exception to trigger failover, otherwise we can not find the potential bug.


---


[GitHub] flink pull request #6257: [FLINK-9676][network] clarify contracts of BufferL...

2018-07-05 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/6257#discussion_r200251500
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
 ---
@@ -251,27 +257,56 @@ private MemorySegment requestMemorySegment(boolean 
isBlocking) throws Interrupte
 
@Override
public void recycle(MemorySegment segment) {
+   BufferListener listener;
synchronized (availableMemorySegments) {
if (isDestroyed || numberOfRequestedMemorySegments > 
currentPoolSize) {
returnMemorySegment(segment);
+   return;
}
else {
-   BufferListener listener = 
registeredListeners.poll();
+   listener = registeredListeners.poll();
 
if (listener == null) {
availableMemorySegments.add(segment);
availableMemorySegments.notify();
+   return;
}
-   else {
-   try {
-   boolean needMoreBuffers = 
listener.notifyBufferAvailable(new NetworkBuffer(segment, this));
-   if (needMoreBuffers) {
-   
registeredListeners.add(listener);
-   }
+   }
+   }
+
+   // We do not know which locks have been acquired before the 
recycle() or are needed in the
+   // notification and which other threads also access them.
+   // -> call notifyBufferAvailable() outside of the synchronized 
block to avoid a deadlock (FLINK-9676)
+   boolean success = false;
+   boolean needMoreBuffers = false;
+   try {
+   needMoreBuffers = listener.notifyBufferAvailable(new 
NetworkBuffer(segment, this));
+   success = true;
+   } catch (Throwable ignored) {
+   // handled below, under the lock
+   }
+
+   if (!success || needMoreBuffers) {
+   synchronized (availableMemorySegments) {
+   if (isDestroyed) {
+   // cleanup tasks how they would have 
been done if we only had one synchronized block
+   if (needMoreBuffers) {
+   
listener.notifyBufferDestroyed();
--- End diff --

Currently `notifyBufferDestroyed` will do nothing, and we should be careful 
if implement this method future similar with `notifyBufferAvailable`.


---


[GitHub] flink pull request #6257: [FLINK-9676][network] clarify contracts of BufferL...

2018-07-04 Thread NicoK
GitHub user NicoK opened a pull request:

https://github.com/apache/flink/pull/6257

[FLINK-9676][network] clarify contracts of 
BufferListener#notifyBufferAvailable() and fix a deadlock

## What is the purpose of the change

When recycling exclusive buffers of a `RemoteInputChannel` and recycling 
(other/floating) buffers to the buffer pool concurrently while the 
`RemoteInputChannel` is registered as a listener to the buffer pool and adding 
the exclusive buffer triggers a floating buffer to be recycled back to the same
buffer pool, a deadlock would occur holding locks on 
`LocalBufferPool#availableMemorySegments` and `RemoteInputChannel#bufferQueue` 
but acquiring them in reverse order.

One such instance would be (thanks @zhijiangW for finding this):

```
Task canceler thread -> RemoteInputChannel1#releaseAllResources -> recycle 
floating buffers
 -> lock(LocalBufferPool#availableMemorySegments) -> 
RemoteInputChannel2#notifyBufferAvailable
 -> try to lock(RemoteInputChannel2#bufferQueue)
```
```
Task thread -> RemoteInputChannel2#recycle
 -> lock(RemoteInputChannel2#bufferQueue) -> bufferQueue#addExclusiveBuffer 
-> floatingBuffer#recycleBuffer
 -> try to lock(LocalBufferPool#availableMemorySegments)
```

This PR is a second approach to #6254 and solves the deadlock on the 
`LocalBufferPool` side as the other solution turned out to be even more complex 
than what's already in the PR (I'll update that PR in a second).
@pnowojski and @tillrohrmann can you also have a quick look so that this 
can get into 1.5.1?

## Brief change log

- clarify the contract of `BufferListener#notifyBufferAvailable()` (see in 
the code)
- make sure that `LocalBufferPool#recycle()` does not break this contract, 
i.e. call the listener's callback outside the lock around 
`LocalBufferPool#availableMemorySegments`

## Verifying this change

This change added tests and can be verified as follows:

- added `RemoteInputChannelTest#testConcurrentRecycleAndRelease2` which 
catches this deadlock quite quickly

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): **no**
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **no**
  - The serializers: **no**
  - The runtime per-record code paths (performance sensitive): **no** (per 
buffer, but we're only moving recycling out of the synchronized block so if 
there's any effect, it should be positive)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: **no**
  - The S3 file system connector: **no**

## Documentation

  - Does this pull request introduce a new feature? **no**
  - If yes, how is the feature documented? **JavaDocs**


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/NicoK/flink flink-9676-lbp

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6257.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6257


commit d69ff968f8647efa13adcc6f338e411675b36d68
Author: Nico Kruber 
Date:   2018-07-04T15:45:18Z

[FLINK-9676][network] clarify contracts of 
BufferListener#notifyBufferAvailable() and fix a deadlock

When recycling exclusive buffers of a RemoteInputChannel and recycling
(other/floating) buffers to the buffer pool concurrently while the
RemoteInputChannel is registered as a listener to the buffer pool and 
adding the
exclusive buffer triggers a floating buffer to be recycled back to the same
buffer pool, a deadlock would occur holding locks on
LocalBufferPool#availableMemorySegments and RemoteInputChannel#bufferQueue 
but
acquiring them in reverse order.

One such instance would be:

Task canceler thread -> RemoteInputChannel1#releaseAllResources -> recycle 
floating buffers
-> lock(LocalBufferPool#availableMemorySegments) -> 
RemoteInputChannel2#notifyBufferAvailable
-> try to lock(RemoteInputChannel2#bufferQueue)

Task thread -> RemoteInputChannel2#recycle
-> lock(RemoteInputChannel2#bufferQueue) -> 
bufferQueue#addExclusiveBuffer -> floatingBuffer#recycleBuffer
-> try to lock(LocalBufferPool#availableMemorySegments)

Therefore, we decouple the listener callback from lock around
LocalBufferPool#availableMemorySegments and implicitly enforce that
RemoteInputChannel2#bufferQueue takes precedence over this lock, i.e. must
be acquired first and should never be taken after having locked on
LocalBufferPool#availableMemorySegments.




---