NicoK closed pull request #6693: [FLINK-10332][network] move data notification 
out of the synchronized block
URL: https://github.com/apache/flink/pull/6693
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java
index 90daf75fcc7..cc0b2220fd2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java
@@ -89,9 +89,7 @@
        public void addInputChannel(RemoteInputChannel listener) throws 
IOException {
                checkError();
 
-               if (!inputChannels.containsKey(listener.getInputChannelId())) {
-                       inputChannels.put(listener.getInputChannelId(), 
listener);
-               }
+               inputChannels.putIfAbsent(listener.getInputChannelId(), 
listener);
        }
 
        @Override
@@ -112,12 +110,7 @@ public void cancelRequestFor(InputChannelID 
inputChannelId) {
 
        @Override
        public void notifyCreditAvailable(final RemoteInputChannel 
inputChannel) {
-               ctx.executor().execute(new Runnable() {
-                       @Override
-                       public void run() {
-                               
ctx.pipeline().fireUserEventTriggered(inputChannel);
-                       }
-               });
+               ctx.executor().execute(() -> 
ctx.pipeline().fireUserEventTriggered(inputChannel));
        }
 
        // 
------------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
index 796e86f51b3..c5ba7a4b7f1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
@@ -85,9 +85,7 @@
        public void addInputChannel(RemoteInputChannel listener) throws 
IOException {
                checkError();
 
-               if (!inputChannels.containsKey(listener.getInputChannelId())) {
-                       inputChannels.put(listener.getInputChannelId(), 
listener);
-               }
+               inputChannels.putIfAbsent(listener.getInputChannelId(), 
listener);
        }
 
        @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
index 8c05b8208f9..c3d3d1bcc10 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
@@ -89,12 +89,7 @@ void notifyReaderNonEmpty(final NetworkSequenceViewReader 
reader) {
                // TODO This could potentially have a bad performance impact as 
in the
                // worst case (network consumes faster than the producer) each 
buffer
                // will trigger a separate event loop task being scheduled.
-               ctx.executor().execute(new Runnable() {
-                       @Override
-                       public void run() {
-                               ctx.pipeline().fireUserEventTriggered(reader);
-                       }
-               });
+               ctx.executor().execute(() -> 
ctx.pipeline().fireUserEventTriggered(reader));
        }
 
        /**
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
index d2d7fdb324b..fe27d97adaa 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
@@ -88,6 +88,7 @@ public void finish() throws IOException {
        private boolean add(BufferConsumer bufferConsumer, boolean finish) {
                checkNotNull(bufferConsumer);
 
+               final boolean notifyDataAvailable;
                synchronized (buffers) {
                        if (isFinished || isReleased) {
                                bufferConsumer.close();
@@ -98,14 +99,13 @@ private boolean add(BufferConsumer bufferConsumer, boolean 
finish) {
                        buffers.add(bufferConsumer);
                        updateStatistics(bufferConsumer);
                        increaseBuffersInBacklog(bufferConsumer);
+                       notifyDataAvailable = shouldNotifyDataAvailable() || 
finish;
 
-                       if (finish) {
-                               isFinished = true;
-                               notifyDataAvailable();
-                       }
-                       else {
-                               maybeNotifyDataAvailable();
-                       }
+                       isFinished |= finish;
+               }
+
+               if (notifyDataAvailable) {
+                       notifyDataAvailable();
                }
 
                return true;
@@ -220,6 +220,7 @@ public boolean isReleased() {
 
        @Override
        public PipelinedSubpartitionView 
createReadView(BufferAvailabilityListener availabilityListener) throws 
IOException {
+               final boolean notifyDataAvailable;
                synchronized (buffers) {
                        checkState(!isReleased);
                        checkState(readView == null,
@@ -230,9 +231,10 @@ public PipelinedSubpartitionView 
createReadView(BufferAvailabilityListener avail
                                parent.getOwningTaskName(), index, 
parent.getPartitionId());
 
                        readView = new PipelinedSubpartitionView(this, 
availabilityListener);
-                       if (!buffers.isEmpty()) {
-                               notifyDataAvailable();
-                       }
+                       notifyDataAvailable = !buffers.isEmpty();
+               }
+               if (notifyDataAvailable) {
+                       notifyDataAvailable();
                }
 
                return readView;
@@ -283,26 +285,24 @@ public int unsynchronizedGetNumberOfQueuedBuffers() {
 
        @Override
        public void flush() {
+               final boolean notifyDataAvailable;
                synchronized (buffers) {
                        if (buffers.isEmpty()) {
                                return;
                        }
-                       if (!flushRequested) {
-                               flushRequested = true; // set this before the 
notification!
-                               // if there is more then 1 buffer, we already 
notified the reader
-                               // (at the latest when adding the second buffer)
-                               if (buffers.size() == 1) {
-                                       notifyDataAvailable();
-                               }
-                       }
+                       // if there is more then 1 buffer, we already notified 
the reader
+                       // (at the latest when adding the second buffer)
+                       notifyDataAvailable = !flushRequested && buffers.size() 
== 1;
+                       flushRequested = true;
+               }
+               if (notifyDataAvailable) {
+                       notifyDataAvailable();
                }
        }
 
-       private void maybeNotifyDataAvailable() {
+       private boolean shouldNotifyDataAvailable() {
                // Notify only when we added first finished buffer.
-               if (getNumberOfFinishedBuffers() == 1) {
-                       notifyDataAvailable();
-               }
+               return readView != null && !flushRequested && 
getNumberOfFinishedBuffers() == 1;
        }
 
        private void notifyDataAvailable() {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to