Murtadha Hubail has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/2921
Change subject: [NO ISSUE][NET] Ensure Thread Safety in
FullFrameChannelReadInterface
......................................................................
[NO ISSUE][NET] Ensure Thread Safety in FullFrameChannelReadInterface
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Use a blocking deque in FullFrameChannelReadInterface
to ensure thread safety between frame consumer and
the networking thread.
Change-Id: I33f0171e49b0ff972730a678e8b61a2070dc8832
---
M
hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelReadInterface.java
1 file changed, 8 insertions(+), 10 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/21/2921/1
diff --git
a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelReadInterface.java
b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelReadInterface.java
index 049cfd8..32bf77e 100644
---
a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelReadInterface.java
+++
b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelReadInterface.java
@@ -21,8 +21,8 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
-import java.util.ArrayDeque;
-import java.util.Deque;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.LinkedBlockingDeque;
import org.apache.hyracks.api.comm.IBufferFactory;
import org.apache.hyracks.api.comm.IChannelControlBlock;
@@ -33,22 +33,20 @@
public class FullFrameChannelReadInterface extends
AbstractChannelReadInterface {
private static final Logger LOGGER = LogManager.getLogger();
- private final Deque<ByteBuffer> riEmptyStack;
+ private final BlockingDeque<ByteBuffer> riEmptyStack;
private final IChannelControlBlock ccb;
FullFrameChannelReadInterface(IChannelControlBlock ccb) {
this.ccb = ccb;
- riEmptyStack = new ArrayDeque<>();
+ riEmptyStack = new LinkedBlockingDeque<>();
credits = 0;
emptyBufferAcceptor = buffer -> {
- int delta = buffer.remaining();
- synchronized (ccb) {
- if (ccb.isRemotelyClosed()) {
- return;
- }
- riEmptyStack.push(buffer);
+ if (ccb.isRemotelyClosed()) {
+ return;
}
+ riEmptyStack.push(buffer);
+ final int delta = buffer.remaining();
ccb.addPendingCredits(delta);
};
}
--
To view, visit https://asterix-gerrit.ics.uci.edu/2921
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I33f0171e49b0ff972730a678e8b61a2070dc8832
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: stabilization-f69489
Gerrit-Owner: Murtadha Hubail <[email protected]>