Murtadha Hubail has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/2921

Change subject: [NO ISSUE][NET] Ensure Thread Safety in 
FullFrameChannelReadInterface
......................................................................

[NO ISSUE][NET] Ensure Thread Safety in FullFrameChannelReadInterface

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Use a blocking deque in FullFrameChannelReadInterface
  to ensure thread safety between frame consumer and
  the networking thread.

Change-Id: I33f0171e49b0ff972730a678e8b61a2070dc8832
---
M 
hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelReadInterface.java
1 file changed, 8 insertions(+), 10 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/21/2921/1

diff --git 
a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelReadInterface.java
 
b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelReadInterface.java
index 049cfd8..32bf77e 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelReadInterface.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelReadInterface.java
@@ -21,8 +21,8 @@
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.SocketChannel;
-import java.util.ArrayDeque;
-import java.util.Deque;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.LinkedBlockingDeque;
 
 import org.apache.hyracks.api.comm.IBufferFactory;
 import org.apache.hyracks.api.comm.IChannelControlBlock;
@@ -33,22 +33,20 @@
 public class FullFrameChannelReadInterface extends 
AbstractChannelReadInterface {
 
     private static final Logger LOGGER = LogManager.getLogger();
-    private final Deque<ByteBuffer> riEmptyStack;
+    private final BlockingDeque<ByteBuffer> riEmptyStack;
     private final IChannelControlBlock ccb;
 
     FullFrameChannelReadInterface(IChannelControlBlock ccb) {
         this.ccb = ccb;
-        riEmptyStack = new ArrayDeque<>();
+        riEmptyStack = new LinkedBlockingDeque<>();
         credits = 0;
 
         emptyBufferAcceptor = buffer -> {
-            int delta = buffer.remaining();
-            synchronized (ccb) {
-                if (ccb.isRemotelyClosed()) {
-                    return;
-                }
-                riEmptyStack.push(buffer);
+            if (ccb.isRemotelyClosed()) {
+                return;
             }
+            riEmptyStack.push(buffer);
+            final int delta = buffer.remaining();
             ccb.addPendingCredits(delta);
         };
     }

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/2921
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I33f0171e49b0ff972730a678e8b61a2070dc8832
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: stabilization-f69489
Gerrit-Owner: Murtadha Hubail <[email protected]>

Reply via email to