Repository: asterixdb Updated Branches: refs/heads/master 8bf93fa70 -> 675244029
[NO ISSUE][NET] Ensure Thread Safety in FullFrameChannelReadInterface - user model changes: no - storage format changes: no - interface changes: no Details: - Use a blocking deque in FullFrameChannelReadInterface to ensure thread safety between frame consumer and the networking thread. Change-Id: I33f0171e49b0ff972730a678e8b61a2070dc8832 Reviewed-on: https://asterix-gerrit.ics.uci.edu/2921 Sonar-Qube: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Reviewed-by: Murtadha Hubail <mhub...@apache.org> Reviewed-by: Michael Blow <mb...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/0b99332f Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/0b99332f Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/0b99332f Branch: refs/heads/master Commit: 0b99332f6c85ea32a79780dc25c0f7b6fd3b75c9 Parents: 167518f Author: Murtadha Hubail <mhub...@apache.org> Authored: Sat Aug 18 13:34:59 2018 -0700 Committer: Murtadha Hubail <mhub...@apache.org> Committed: Sun Aug 19 08:42:56 2018 -0700 ---------------------------------------------------------------------- .../muxdemux/FullFrameChannelReadInterface.java | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0b99332f/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelReadInterface.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelReadInterface.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelReadInterface.java index 049cfd8..32bf77e 100644 --- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelReadInterface.java +++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelReadInterface.java @@ -21,8 +21,8 @@ package org.apache.hyracks.net.protocols.muxdemux; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; -import java.util.ArrayDeque; -import java.util.Deque; +import java.util.concurrent.BlockingDeque; +import java.util.concurrent.LinkedBlockingDeque; import org.apache.hyracks.api.comm.IBufferFactory; import org.apache.hyracks.api.comm.IChannelControlBlock; @@ -33,22 +33,20 @@ import org.apache.logging.log4j.Logger; public class FullFrameChannelReadInterface extends AbstractChannelReadInterface { private static final Logger LOGGER = LogManager.getLogger(); - private final Deque<ByteBuffer> riEmptyStack; + private final BlockingDeque<ByteBuffer> riEmptyStack; private final IChannelControlBlock ccb; FullFrameChannelReadInterface(IChannelControlBlock ccb) { this.ccb = ccb; - riEmptyStack = new ArrayDeque<>(); + riEmptyStack = new LinkedBlockingDeque<>(); credits = 0; emptyBufferAcceptor = buffer -> { - int delta = buffer.remaining(); - synchronized (ccb) { - if (ccb.isRemotelyClosed()) { - return; - } - riEmptyStack.push(buffer); + if (ccb.isRemotelyClosed()) { + return; } + riEmptyStack.push(buffer); + final int delta = buffer.remaining(); ccb.addPendingCredits(delta); }; }