[GitHub] flink pull request #6257: [FLINK-9676][network] clarify contracts of BufferL...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6257 ---
[GitHub] flink pull request #6257: [FLINK-9676][network] clarify contracts of BufferL...
Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/6257#discussion_r200287796 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java --- @@ -251,27 +257,56 @@ private MemorySegment requestMemorySegment(boolean isBlocking) throws Interrupte @Override public void recycle(MemorySegment segment) { + BufferListener listener; synchronized (availableMemorySegments) { if (isDestroyed || numberOfRequestedMemorySegments > currentPoolSize) { returnMemorySegment(segment); + return; } else { - BufferListener listener = registeredListeners.poll(); + listener = registeredListeners.poll(); if (listener == null) { availableMemorySegments.add(segment); availableMemorySegments.notify(); + return; } - else { - try { - boolean needMoreBuffers = listener.notifyBufferAvailable(new NetworkBuffer(segment, this)); - if (needMoreBuffers) { - registeredListeners.add(listener); - } + } + } + + // We do not know which locks have been acquired before the recycle() or are needed in the + // notification and which other threads also access them. + // -> call notifyBufferAvailable() outside of the synchronized block to avoid a deadlock (FLINK-9676) + boolean success = false; + boolean needMoreBuffers = false; + try { + needMoreBuffers = listener.notifyBufferAvailable(new NetworkBuffer(segment, this)); + success = true; + } catch (Throwable ignored) { + // handled below, under the lock + } + + if (!success || needMoreBuffers) { + synchronized (availableMemorySegments) { + if (isDestroyed) { + // cleanup tasks how they would have been done if we only had one synchronized block + if (needMoreBuffers) { + listener.notifyBufferDestroyed(); --- End diff -- ð ---
[GitHub] flink pull request #6257: [FLINK-9676][network] clarify contracts of BufferL...
Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/6257#discussion_r200287730 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java --- @@ -251,27 +257,56 @@ private MemorySegment requestMemorySegment(boolean isBlocking) throws Interrupte @Override public void recycle(MemorySegment segment) { + BufferListener listener; synchronized (availableMemorySegments) { if (isDestroyed || numberOfRequestedMemorySegments > currentPoolSize) { returnMemorySegment(segment); + return; } else { - BufferListener listener = registeredListeners.poll(); + listener = registeredListeners.poll(); if (listener == null) { availableMemorySegments.add(segment); availableMemorySegments.notify(); + return; } - else { - try { - boolean needMoreBuffers = listener.notifyBufferAvailable(new NetworkBuffer(segment, this)); - if (needMoreBuffers) { - registeredListeners.add(listener); - } + } + } + + // We do not know which locks have been acquired before the recycle() or are needed in the + // notification and which other threads also access them. + // -> call notifyBufferAvailable() outside of the synchronized block to avoid a deadlock (FLINK-9676) + boolean success = false; + boolean needMoreBuffers = false; + try { + needMoreBuffers = listener.notifyBufferAvailable(new NetworkBuffer(segment, this)); + success = true; + } catch (Throwable ignored) { + // handled below, under the lock + } + + if (!success || needMoreBuffers) { + synchronized (availableMemorySegments) { + if (isDestroyed) { + // cleanup tasks how they would have been done if we only had one synchronized block + if (needMoreBuffers) { + listener.notifyBufferDestroyed(); } - catch (Throwable ignored) { - availableMemorySegments.add(segment); - availableMemorySegments.notify(); --- End diff -- ð ---
[GitHub] flink pull request #6257: [FLINK-9676][network] clarify contracts of BufferL...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/6257#discussion_r200283214 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java --- @@ -251,27 +257,56 @@ private MemorySegment requestMemorySegment(boolean isBlocking) throws Interrupte @Override public void recycle(MemorySegment segment) { + BufferListener listener; synchronized (availableMemorySegments) { if (isDestroyed || numberOfRequestedMemorySegments > currentPoolSize) { returnMemorySegment(segment); + return; } else { - BufferListener listener = registeredListeners.poll(); + listener = registeredListeners.poll(); if (listener == null) { availableMemorySegments.add(segment); availableMemorySegments.notify(); + return; } - else { - try { - boolean needMoreBuffers = listener.notifyBufferAvailable(new NetworkBuffer(segment, this)); - if (needMoreBuffers) { - registeredListeners.add(listener); - } + } + } + + // We do not know which locks have been acquired before the recycle() or are needed in the + // notification and which other threads also access them. + // -> call notifyBufferAvailable() outside of the synchronized block to avoid a deadlock (FLINK-9676) + boolean success = false; + boolean needMoreBuffers = false; + try { + needMoreBuffers = listener.notifyBufferAvailable(new NetworkBuffer(segment, this)); + success = true; + } catch (Throwable ignored) { + // handled below, under the lock + } + + if (!success || needMoreBuffers) { + synchronized (availableMemorySegments) { + if (isDestroyed) { + // cleanup tasks how they would have been done if we only had one synchronized block + if (needMoreBuffers) { + listener.notifyBufferDestroyed(); --- End diff -- actually, let's do this in a follow-up PR ---
[GitHub] flink pull request #6257: [FLINK-9676][network] clarify contracts of BufferL...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/6257#discussion_r200274433 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java --- @@ -251,27 +257,56 @@ private MemorySegment requestMemorySegment(boolean isBlocking) throws Interrupte @Override public void recycle(MemorySegment segment) { + BufferListener listener; synchronized (availableMemorySegments) { if (isDestroyed || numberOfRequestedMemorySegments > currentPoolSize) { returnMemorySegment(segment); + return; } else { - BufferListener listener = registeredListeners.poll(); + listener = registeredListeners.poll(); if (listener == null) { availableMemorySegments.add(segment); availableMemorySegments.notify(); + return; } - else { - try { - boolean needMoreBuffers = listener.notifyBufferAvailable(new NetworkBuffer(segment, this)); - if (needMoreBuffers) { - registeredListeners.add(listener); - } + } + } + + // We do not know which locks have been acquired before the recycle() or are needed in the + // notification and which other threads also access them. + // -> call notifyBufferAvailable() outside of the synchronized block to avoid a deadlock (FLINK-9676) + boolean success = false; + boolean needMoreBuffers = false; + try { + needMoreBuffers = listener.notifyBufferAvailable(new NetworkBuffer(segment, this)); + success = true; + } catch (Throwable ignored) { + // handled below, under the lock + } + + if (!success || needMoreBuffers) { + synchronized (availableMemorySegments) { + if (isDestroyed) { + // cleanup tasks how they would have been done if we only had one synchronized block + if (needMoreBuffers) { + listener.notifyBufferDestroyed(); --- End diff -- true, this ~could~ should be done outside the lock as well ---
[GitHub] flink pull request #6257: [FLINK-9676][network] clarify contracts of BufferL...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/6257#discussion_r200274038 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java --- @@ -251,27 +257,56 @@ private MemorySegment requestMemorySegment(boolean isBlocking) throws Interrupte @Override public void recycle(MemorySegment segment) { + BufferListener listener; synchronized (availableMemorySegments) { if (isDestroyed || numberOfRequestedMemorySegments > currentPoolSize) { returnMemorySegment(segment); + return; } else { - BufferListener listener = registeredListeners.poll(); + listener = registeredListeners.poll(); if (listener == null) { availableMemorySegments.add(segment); availableMemorySegments.notify(); + return; } - else { - try { - boolean needMoreBuffers = listener.notifyBufferAvailable(new NetworkBuffer(segment, this)); - if (needMoreBuffers) { - registeredListeners.add(listener); - } + } + } + + // We do not know which locks have been acquired before the recycle() or are needed in the + // notification and which other threads also access them. + // -> call notifyBufferAvailable() outside of the synchronized block to avoid a deadlock (FLINK-9676) + boolean success = false; + boolean needMoreBuffers = false; + try { + needMoreBuffers = listener.notifyBufferAvailable(new NetworkBuffer(segment, this)); + success = true; + } catch (Throwable ignored) { + // handled below, under the lock + } + + if (!success || needMoreBuffers) { + synchronized (availableMemorySegments) { + if (isDestroyed) { + // cleanup tasks how they would have been done if we only had one synchronized block + if (needMoreBuffers) { + listener.notifyBufferDestroyed(); } - catch (Throwable ignored) { - availableMemorySegments.add(segment); - availableMemorySegments.notify(); --- End diff -- true, that's why I created FLINK-9755 for this issue and already have code (have to add tests though) - expect a PR soon ---
[GitHub] flink pull request #6257: [FLINK-9676][network] clarify contracts of BufferL...
Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/6257#discussion_r200252253 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java --- @@ -251,27 +257,56 @@ private MemorySegment requestMemorySegment(boolean isBlocking) throws Interrupte @Override public void recycle(MemorySegment segment) { + BufferListener listener; synchronized (availableMemorySegments) { if (isDestroyed || numberOfRequestedMemorySegments > currentPoolSize) { returnMemorySegment(segment); + return; } else { - BufferListener listener = registeredListeners.poll(); + listener = registeredListeners.poll(); if (listener == null) { availableMemorySegments.add(segment); availableMemorySegments.notify(); + return; } - else { - try { - boolean needMoreBuffers = listener.notifyBufferAvailable(new NetworkBuffer(segment, this)); - if (needMoreBuffers) { - registeredListeners.add(listener); - } + } + } + + // We do not know which locks have been acquired before the recycle() or are needed in the + // notification and which other threads also access them. + // -> call notifyBufferAvailable() outside of the synchronized block to avoid a deadlock (FLINK-9676) + boolean success = false; + boolean needMoreBuffers = false; + try { + needMoreBuffers = listener.notifyBufferAvailable(new NetworkBuffer(segment, this)); + success = true; + } catch (Throwable ignored) { + // handled below, under the lock + } + + if (!success || needMoreBuffers) { + synchronized (availableMemorySegments) { + if (isDestroyed) { + // cleanup tasks how they would have been done if we only had one synchronized block + if (needMoreBuffers) { + listener.notifyBufferDestroyed(); } - catch (Throwable ignored) { - availableMemorySegments.add(segment); - availableMemorySegments.notify(); --- End diff -- I am wondering whether we should rethrow this exception under below handling in the end. For example: During `RemoteInputChannel#notifyBufferAvailable`, if the tag of `isWaitingForFloatingBuffers` is not consistent, we should throw this exception to trigger failover, otherwise we can not find the potential bug. ---
[GitHub] flink pull request #6257: [FLINK-9676][network] clarify contracts of BufferL...
Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/6257#discussion_r200251500 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java --- @@ -251,27 +257,56 @@ private MemorySegment requestMemorySegment(boolean isBlocking) throws Interrupte @Override public void recycle(MemorySegment segment) { + BufferListener listener; synchronized (availableMemorySegments) { if (isDestroyed || numberOfRequestedMemorySegments > currentPoolSize) { returnMemorySegment(segment); + return; } else { - BufferListener listener = registeredListeners.poll(); + listener = registeredListeners.poll(); if (listener == null) { availableMemorySegments.add(segment); availableMemorySegments.notify(); + return; } - else { - try { - boolean needMoreBuffers = listener.notifyBufferAvailable(new NetworkBuffer(segment, this)); - if (needMoreBuffers) { - registeredListeners.add(listener); - } + } + } + + // We do not know which locks have been acquired before the recycle() or are needed in the + // notification and which other threads also access them. + // -> call notifyBufferAvailable() outside of the synchronized block to avoid a deadlock (FLINK-9676) + boolean success = false; + boolean needMoreBuffers = false; + try { + needMoreBuffers = listener.notifyBufferAvailable(new NetworkBuffer(segment, this)); + success = true; + } catch (Throwable ignored) { + // handled below, under the lock + } + + if (!success || needMoreBuffers) { + synchronized (availableMemorySegments) { + if (isDestroyed) { + // cleanup tasks how they would have been done if we only had one synchronized block + if (needMoreBuffers) { + listener.notifyBufferDestroyed(); --- End diff -- Currently `notifyBufferDestroyed` will do nothing, and we should be careful if implement this method future similar with `notifyBufferAvailable`. ---
[GitHub] flink pull request #6257: [FLINK-9676][network] clarify contracts of BufferL...
GitHub user NicoK opened a pull request: https://github.com/apache/flink/pull/6257 [FLINK-9676][network] clarify contracts of BufferListener#notifyBufferAvailable() and fix a deadlock ## What is the purpose of the change When recycling exclusive buffers of a `RemoteInputChannel` and recycling (other/floating) buffers to the buffer pool concurrently while the `RemoteInputChannel` is registered as a listener to the buffer pool and adding the exclusive buffer triggers a floating buffer to be recycled back to the same buffer pool, a deadlock would occur holding locks on `LocalBufferPool#availableMemorySegments` and `RemoteInputChannel#bufferQueue` but acquiring them in reverse order. One such instance would be (thanks @zhijiangW for finding this): ``` Task canceler thread -> RemoteInputChannel1#releaseAllResources -> recycle floating buffers -> lock(LocalBufferPool#availableMemorySegments) -> RemoteInputChannel2#notifyBufferAvailable -> try to lock(RemoteInputChannel2#bufferQueue) ``` ``` Task thread -> RemoteInputChannel2#recycle -> lock(RemoteInputChannel2#bufferQueue) -> bufferQueue#addExclusiveBuffer -> floatingBuffer#recycleBuffer -> try to lock(LocalBufferPool#availableMemorySegments) ``` This PR is a second approach to #6254 and solves the deadlock on the `LocalBufferPool` side as the other solution turned out to be even more complex than what's already in the PR (I'll update that PR in a second). @pnowojski and @tillrohrmann can you also have a quick look so that this can get into 1.5.1? ## Brief change log - clarify the contract of `BufferListener#notifyBufferAvailable()` (see in the code) - make sure that `LocalBufferPool#recycle()` does not break this contract, i.e. call the listener's callback outside the lock around `LocalBufferPool#availableMemorySegments` ## Verifying this change This change added tests and can be verified as follows: - added `RemoteInputChannelTest#testConcurrentRecycleAndRelease2` which catches this deadlock quite quickly ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): **no** - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: **no** - The serializers: **no** - The runtime per-record code paths (performance sensitive): **no** (per buffer, but we're only moving recycling out of the synchronized block so if there's any effect, it should be positive) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: **no** - The S3 file system connector: **no** ## Documentation - Does this pull request introduce a new feature? **no** - If yes, how is the feature documented? **JavaDocs** You can merge this pull request into a Git repository by running: $ git pull https://github.com/NicoK/flink flink-9676-lbp Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6257.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6257 commit d69ff968f8647efa13adcc6f338e411675b36d68 Author: Nico Kruber Date: 2018-07-04T15:45:18Z [FLINK-9676][network] clarify contracts of BufferListener#notifyBufferAvailable() and fix a deadlock When recycling exclusive buffers of a RemoteInputChannel and recycling (other/floating) buffers to the buffer pool concurrently while the RemoteInputChannel is registered as a listener to the buffer pool and adding the exclusive buffer triggers a floating buffer to be recycled back to the same buffer pool, a deadlock would occur holding locks on LocalBufferPool#availableMemorySegments and RemoteInputChannel#bufferQueue but acquiring them in reverse order. One such instance would be: Task canceler thread -> RemoteInputChannel1#releaseAllResources -> recycle floating buffers -> lock(LocalBufferPool#availableMemorySegments) -> RemoteInputChannel2#notifyBufferAvailable -> try to lock(RemoteInputChannel2#bufferQueue) Task thread -> RemoteInputChannel2#recycle -> lock(RemoteInputChannel2#bufferQueue) -> bufferQueue#addExclusiveBuffer -> floatingBuffer#recycleBuffer -> try to lock(LocalBufferPool#availableMemorySegments) Therefore, we decouple the listener callback from lock around LocalBufferPool#availableMemorySegments and implicitly enforce that RemoteInputChannel2#bufferQueue takes precedence over this lock, i.e. must be acquired first and should never be taken after having locked on LocalBufferPool#availableMemorySegments. ---