>From Ali Alsuliman <[email protected]>:

Ali Alsuliman has uploaded this change for review. ( 
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18223 )


Change subject: WIP: close file network channels early
......................................................................

WIP: close file network channels early

Change-Id: I6f63252b64d3b70d407f7734edad4cd98dda493c
---
M 
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/InputChannelFrameReader.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/ResultNetworkManager.java
M 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractMedianAggregateFunction.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializedPartition.java
M 
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/DeallocatableTest.java
M 
hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkOutputChannel.java
M 
hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/FileNetworkInputChannel.java
M 
hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionFileReaderUtil.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/NetworkManager.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionFileReader.java
11 files changed, 70 insertions(+), 16 deletions(-)



  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/23/18223/1

diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/DeallocatableTest.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/DeallocatableTest.java
index cfd251b..0f6f6a3 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/DeallocatableTest.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/DeallocatableTest.java
@@ -65,7 +65,7 @@
             final PartitionId pid = new 
PartitionId(ctx.getJobletContext().getJobId(), codId, 1, 1);
             final ChannelControlBlock ccb = ncs.getNetworkManager()
                     
.connect(NetworkingUtil.getSocketAddress(ncs.getNetworkManager().getLocalNetworkAddress()));
-            final NetworkOutputChannel networkOutputChannel = new 
NetworkOutputChannel(ccb, 0);
+            final NetworkOutputChannel networkOutputChannel = 
NetworkOutputChannel.newChannel(ccb, 0);
             final MaterializingPipelinedPartition mpp =
                     new MaterializingPipelinedPartition(ctx, 
ncs.getPartitionManager(), pid, taId, ncs.getExecutor());
             mpp.open();
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractMedianAggregateFunction.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractMedianAggregateFunction.java
index 207eaa3..1367f2e 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractMedianAggregateFunction.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractMedianAggregateFunction.java
@@ -291,9 +291,6 @@
                 }
                 currentTupleCount += tupleCount;
             }
-            while (merger.nextFrame(frame)) {
-                // consume the remaining frames to close the network channels 
gracefully
-            }
         } finally {
             merger.close();
         }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/FileNetworkInputChannel.java
 
b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/FileNetworkInputChannel.java
index 542fda1..6930377 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/FileNetworkInputChannel.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/FileNetworkInputChannel.java
@@ -101,12 +101,11 @@
         writeBuffer.flip();

         ccb.getWriteInterface().getFullBufferAcceptor().accept(writeBuffer);
-        ccb.getWriteInterface().getFullBufferAcceptor().close();
     }

     @Override
     public void close() throws HyracksDataException {
-
+        ccb.getWriteInterface().getFullBufferAcceptor().close();
     }

     @Override
diff --git 
a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkOutputChannel.java
 
b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkOutputChannel.java
index 17cdc3e..58fb368 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkOutputChannel.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkOutputChannel.java
@@ -37,19 +37,30 @@

     private final Deque<ByteBuffer> emptyStack;

+    private final boolean fileOutputChannel;
+
     private boolean aborted;

     private int frameSize = 32768;

     private int allocateCounter = 0;

-    public NetworkOutputChannel(ChannelControlBlock ccb, int nBuffers) {
+    private NetworkOutputChannel(ChannelControlBlock ccb, int nBuffers, 
boolean fileOutputChannel) {
         this.ccb = ccb;
         this.nBuffers = nBuffers;
+        this.fileOutputChannel = fileOutputChannel;
         emptyStack = new ArrayDeque<>(nBuffers);
         ccb.getWriteInterface().setEmptyBufferAcceptor(new 
WriteEmptyBufferAcceptor());
     }

+    public static NetworkOutputChannel newChannel(ChannelControlBlock ccb, int 
nBuffers) {
+        return new NetworkOutputChannel(ccb, nBuffers, false);
+    }
+
+    public static NetworkOutputChannel newFileChannel(ChannelControlBlock ccb, 
int nBuffers) {
+        return new NetworkOutputChannel(ccb, nBuffers, true);
+    }
+
     public void setFrameSize(int frameSize) {
         this.frameSize = frameSize;
     }
@@ -65,6 +76,9 @@
         while (buffer.hasRemaining()) {
             synchronized (this) {
                 while (true) {
+                    if (isRemoteClosed()) {
+                        return;
+                    }
                     if (aborted) {
                         throw new HyracksDataException("Connection has been 
aborted");
                     }
@@ -122,6 +136,16 @@
         }
     }

+    public void remoteClosed() {
+        synchronized (NetworkOutputChannel.this) {
+            NetworkOutputChannel.this.notifyAll();
+        }
+    }
+
+    public boolean isRemoteClosed() {
+        return fileOutputChannel && ccb.getRemoteEOS();
+    }
+
     private class WriteEmptyBufferAcceptor implements IBufferAcceptor {
         @Override
         public void accept(ByteBuffer buffer) {
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/NetworkManager.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/NetworkManager.java
index 468a969..fe98e35 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/NetworkManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/NetworkManager.java
@@ -131,18 +131,21 @@

         @Override
         public void accept(ByteBuffer buffer) {
-            noc = new NetworkOutputChannel(ccb, nBuffers);
             long id = buffer.getLong();
             if (id == FileNetworkInputChannel.FILE_CHANNEL_CODE) {
+                noc = NetworkOutputChannel.newFileChannel(ccb, nBuffers);
                 handleFileRequest(buffer);
             } else {
+                noc = NetworkOutputChannel.newChannel(ccb, nBuffers);
                 handlePartitionRequest(buffer, id);
             }
         }

         @Override
         public void close() {
-
+            if (noc != null) {
+                noc.remoteClosed();
+            }
         }
 
         @Override
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/ResultNetworkManager.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/ResultNetworkManager.java
index fe8f2af..f47f849 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/ResultNetworkManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/ResultNetworkManager.java
@@ -143,7 +143,7 @@
                 LOGGER.trace("Received initial result partition read request 
for JobId: " + jobId + " partition: "
                         + partition + " on channel: " + ccb);
             }
-            noc = new NetworkOutputChannel(ccb, nBuffers);
+            noc = NetworkOutputChannel.newChannel(ccb, nBuffers);
             try {
                 partitionManager.initializeResultPartitionReader(jobId, rsId, 
partition, noc);
             } catch (HyracksException e) {
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializedPartition.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializedPartition.java
index 1471453..44523f9 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializedPartition.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializedPartition.java
@@ -57,7 +57,7 @@

     @Override
     public void writeTo(final IFrameWriter writer) {
-        executor.execute(new PartitionFileReader(ctx, partitionFile, 
ioManager, writer, false));
+        executor.execute(PartitionFileReader.newForMaterializedPartition(ctx, 
partitionFile, ioManager, writer));
     }

     @Override
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionFileReader.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionFileReader.java
index 609b32a..d35a658 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionFileReader.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionFileReader.java
@@ -27,6 +27,7 @@
 import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.api.io.IFileHandle;
 import org.apache.hyracks.api.io.IIOManager;
+import org.apache.hyracks.comm.channels.NetworkOutputChannel;

 public class PartitionFileReader implements Runnable {

@@ -34,15 +35,29 @@
     private final FileReference partitionFile;
     private final IIOManager ioManager;
     private final IFrameWriter writer;
+    private final NetworkOutputChannel networkOutputChannel;
     private final boolean deleteFile;
+    private final boolean fileOutputChannel;

-    public PartitionFileReader(IHyracksCommonContext ctx, FileReference 
partitionFile, IIOManager ioManager,
-            IFrameWriter writer, boolean deleteFile) {
+    private PartitionFileReader(IHyracksCommonContext ctx, FileReference 
partitionFile, IIOManager ioManager,
+            IFrameWriter writer, boolean deleteFile, boolean 
fileOutputChannel) {
         this.ctx = ctx;
         this.partitionFile = partitionFile;
         this.ioManager = ioManager;
         this.writer = writer;
         this.deleteFile = deleteFile;
+        this.fileOutputChannel = fileOutputChannel;
+        this.networkOutputChannel = fileOutputChannel ? (NetworkOutputChannel) 
writer : null;
+    }
+
+    public static PartitionFileReader 
newForFileOutputChannel(IHyracksCommonContext ctx, FileReference partitionFile,
+            IIOManager ioManager, NetworkOutputChannel writer) {
+        return new PartitionFileReader(ctx, partitionFile, ioManager, writer, 
true, true);
+    }
+
+    public static PartitionFileReader 
newForMaterializedPartition(IHyracksCommonContext ctx,
+            FileReference partitionFile, IIOManager ioManager, IFrameWriter 
writer) {
+        return new PartitionFileReader(ctx, partitionFile, ioManager, writer, 
false, false);
     }

     @Override
@@ -61,6 +76,9 @@
                     long offset = 0;
                     ByteBuffer buffer = ctx.allocateFrame();
                     while (true) {
+                        if (isRemoteClosed()) {
+                            break;
+                        }
                         buffer.clear();
                         long size = ioManager.syncRead(fh, offset, buffer);
                         if (size < 0) {
@@ -88,4 +106,8 @@
             throw new RuntimeException(e);
         }
     }
+
+    private boolean isRemoteClosed() {
+        return fileOutputChannel && networkOutputChannel.isRemoteClosed();
+    }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionFileReaderUtil.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionFileReaderUtil.java
index 2a105d9..93f1044 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionFileReaderUtil.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionFileReaderUtil.java
@@ -53,6 +53,6 @@
         }
         ExecutorService executor = ncs.getExecutor();
         noc.setFrameSize(joblet.getInitialFrameSize());
-        executor.execute(new PartitionFileReader(joblet, fileRef, 
ncs.getIoManager(), noc, true));
+        executor.execute(PartitionFileReader.newForFileOutputChannel(joblet, 
fileRef, ncs.getIoManager(), noc));
     }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/InputChannelFrameReader.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/InputChannelFrameReader.java
index f32adcc..e8e7d2a 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/InputChannelFrameReader.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/InputChannelFrameReader.java
@@ -120,7 +120,7 @@

     @Override
     public void close() throws HyracksDataException {
-
+        channel.close();
     }

     @Override
diff --git 
a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
 
b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
index 0151dbf..14d72de 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
@@ -122,7 +122,7 @@
         remoteCloseAck.set(true);
     }

-    boolean getRemoteEOS() {
+    public boolean getRemoteEOS() {
         return remoteClose.get();
     }


--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18223
To unsubscribe, or for help writing mail filters, visit 
https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: neo
Gerrit-Change-Id: I6f63252b64d3b70d407f7734edad4cd98dda493c
Gerrit-Change-Number: 18223
Gerrit-PatchSet: 1
Gerrit-Owner: Ali Alsuliman <[email protected]>
Gerrit-MessageType: newchange

Reply via email to