Repository: asterixdb
Updated Branches:
  refs/heads/master 8bf93fa70 -> 675244029


[NO ISSUE][NET] Ensure Thread Safety in FullFrameChannelReadInterface

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Use a blocking deque in FullFrameChannelReadInterface
  to ensure thread safety between frame consumer and
  the networking thread.

Change-Id: I33f0171e49b0ff972730a678e8b61a2070dc8832
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2921
Sonar-Qube: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhub...@apache.org>
Reviewed-by: Michael Blow <mb...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/0b99332f
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/0b99332f
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/0b99332f

Branch: refs/heads/master
Commit: 0b99332f6c85ea32a79780dc25c0f7b6fd3b75c9
Parents: 167518f
Author: Murtadha Hubail <mhub...@apache.org>
Authored: Sat Aug 18 13:34:59 2018 -0700
Committer: Murtadha Hubail <mhub...@apache.org>
Committed: Sun Aug 19 08:42:56 2018 -0700

----------------------------------------------------------------------
 .../muxdemux/FullFrameChannelReadInterface.java   | 18 ++++++++----------
 1 file changed, 8 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0b99332f/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelReadInterface.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelReadInterface.java
 
b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelReadInterface.java
index 049cfd8..32bf77e 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelReadInterface.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelReadInterface.java
@@ -21,8 +21,8 @@ package org.apache.hyracks.net.protocols.muxdemux;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.SocketChannel;
-import java.util.ArrayDeque;
-import java.util.Deque;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.LinkedBlockingDeque;
 
 import org.apache.hyracks.api.comm.IBufferFactory;
 import org.apache.hyracks.api.comm.IChannelControlBlock;
@@ -33,22 +33,20 @@ import org.apache.logging.log4j.Logger;
 public class FullFrameChannelReadInterface extends 
AbstractChannelReadInterface {
 
     private static final Logger LOGGER = LogManager.getLogger();
-    private final Deque<ByteBuffer> riEmptyStack;
+    private final BlockingDeque<ByteBuffer> riEmptyStack;
     private final IChannelControlBlock ccb;
 
     FullFrameChannelReadInterface(IChannelControlBlock ccb) {
         this.ccb = ccb;
-        riEmptyStack = new ArrayDeque<>();
+        riEmptyStack = new LinkedBlockingDeque<>();
         credits = 0;
 
         emptyBufferAcceptor = buffer -> {
-            int delta = buffer.remaining();
-            synchronized (ccb) {
-                if (ccb.isRemotelyClosed()) {
-                    return;
-                }
-                riEmptyStack.push(buffer);
+            if (ccb.isRemotelyClosed()) {
+                return;
             }
+            riEmptyStack.push(buffer);
+            final int delta = buffer.remaining();
             ccb.addPendingCredits(delta);
         };
     }

Reply via email to