>From Ali Alsuliman <[email protected]>:
Ali Alsuliman has uploaded this change for review. (
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18223 )
Change subject: WIP: close file network channels early
......................................................................
WIP: close file network channels early
Change-Id: I6f63252b64d3b70d407f7734edad4cd98dda493c
---
M
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/InputChannelFrameReader.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/ResultNetworkManager.java
M
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractMedianAggregateFunction.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializedPartition.java
M
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/DeallocatableTest.java
M
hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkOutputChannel.java
M
hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/FileNetworkInputChannel.java
M
hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionFileReaderUtil.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/NetworkManager.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionFileReader.java
11 files changed, 70 insertions(+), 16 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/23/18223/1
diff --git
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/DeallocatableTest.java
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/DeallocatableTest.java
index cfd251b..0f6f6a3 100644
---
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/DeallocatableTest.java
+++
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/DeallocatableTest.java
@@ -65,7 +65,7 @@
final PartitionId pid = new
PartitionId(ctx.getJobletContext().getJobId(), codId, 1, 1);
final ChannelControlBlock ccb = ncs.getNetworkManager()
.connect(NetworkingUtil.getSocketAddress(ncs.getNetworkManager().getLocalNetworkAddress()));
- final NetworkOutputChannel networkOutputChannel = new
NetworkOutputChannel(ccb, 0);
+ final NetworkOutputChannel networkOutputChannel =
NetworkOutputChannel.newChannel(ccb, 0);
final MaterializingPipelinedPartition mpp =
new MaterializingPipelinedPartition(ctx,
ncs.getPartitionManager(), pid, taId, ncs.getExecutor());
mpp.open();
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractMedianAggregateFunction.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractMedianAggregateFunction.java
index 207eaa3..1367f2e 100644
---
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractMedianAggregateFunction.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractMedianAggregateFunction.java
@@ -291,9 +291,6 @@
}
currentTupleCount += tupleCount;
}
- while (merger.nextFrame(frame)) {
- // consume the remaining frames to close the network channels
gracefully
- }
} finally {
merger.close();
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/FileNetworkInputChannel.java
b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/FileNetworkInputChannel.java
index 542fda1..6930377 100644
---
a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/FileNetworkInputChannel.java
+++
b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/FileNetworkInputChannel.java
@@ -101,12 +101,11 @@
writeBuffer.flip();
ccb.getWriteInterface().getFullBufferAcceptor().accept(writeBuffer);
- ccb.getWriteInterface().getFullBufferAcceptor().close();
}
@Override
public void close() throws HyracksDataException {
-
+ ccb.getWriteInterface().getFullBufferAcceptor().close();
}
@Override
diff --git
a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkOutputChannel.java
b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkOutputChannel.java
index 17cdc3e..58fb368 100644
---
a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkOutputChannel.java
+++
b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkOutputChannel.java
@@ -37,19 +37,30 @@
private final Deque<ByteBuffer> emptyStack;
+ private final boolean fileOutputChannel;
+
private boolean aborted;
private int frameSize = 32768;
private int allocateCounter = 0;
- public NetworkOutputChannel(ChannelControlBlock ccb, int nBuffers) {
+ private NetworkOutputChannel(ChannelControlBlock ccb, int nBuffers,
boolean fileOutputChannel) {
this.ccb = ccb;
this.nBuffers = nBuffers;
+ this.fileOutputChannel = fileOutputChannel;
emptyStack = new ArrayDeque<>(nBuffers);
ccb.getWriteInterface().setEmptyBufferAcceptor(new
WriteEmptyBufferAcceptor());
}
+ public static NetworkOutputChannel newChannel(ChannelControlBlock ccb, int
nBuffers) {
+ return new NetworkOutputChannel(ccb, nBuffers, false);
+ }
+
+ public static NetworkOutputChannel newFileChannel(ChannelControlBlock ccb,
int nBuffers) {
+ return new NetworkOutputChannel(ccb, nBuffers, true);
+ }
+
public void setFrameSize(int frameSize) {
this.frameSize = frameSize;
}
@@ -65,6 +76,9 @@
while (buffer.hasRemaining()) {
synchronized (this) {
while (true) {
+ if (isRemoteClosed()) {
+ return;
+ }
if (aborted) {
throw new HyracksDataException("Connection has been
aborted");
}
@@ -122,6 +136,16 @@
}
}
+ public void remoteClosed() {
+ synchronized (NetworkOutputChannel.this) {
+ NetworkOutputChannel.this.notifyAll();
+ }
+ }
+
+ public boolean isRemoteClosed() {
+ return fileOutputChannel && ccb.getRemoteEOS();
+ }
+
private class WriteEmptyBufferAcceptor implements IBufferAcceptor {
@Override
public void accept(ByteBuffer buffer) {
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/NetworkManager.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/NetworkManager.java
index 468a969..fe98e35 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/NetworkManager.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/NetworkManager.java
@@ -131,18 +131,21 @@
@Override
public void accept(ByteBuffer buffer) {
- noc = new NetworkOutputChannel(ccb, nBuffers);
long id = buffer.getLong();
if (id == FileNetworkInputChannel.FILE_CHANNEL_CODE) {
+ noc = NetworkOutputChannel.newFileChannel(ccb, nBuffers);
handleFileRequest(buffer);
} else {
+ noc = NetworkOutputChannel.newChannel(ccb, nBuffers);
handlePartitionRequest(buffer, id);
}
}
@Override
public void close() {
-
+ if (noc != null) {
+ noc.remoteClosed();
+ }
}
@Override
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/ResultNetworkManager.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/ResultNetworkManager.java
index fe8f2af..f47f849 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/ResultNetworkManager.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/ResultNetworkManager.java
@@ -143,7 +143,7 @@
LOGGER.trace("Received initial result partition read request
for JobId: " + jobId + " partition: "
+ partition + " on channel: " + ccb);
}
- noc = new NetworkOutputChannel(ccb, nBuffers);
+ noc = NetworkOutputChannel.newChannel(ccb, nBuffers);
try {
partitionManager.initializeResultPartitionReader(jobId, rsId,
partition, noc);
} catch (HyracksException e) {
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializedPartition.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializedPartition.java
index 1471453..44523f9 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializedPartition.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializedPartition.java
@@ -57,7 +57,7 @@
@Override
public void writeTo(final IFrameWriter writer) {
- executor.execute(new PartitionFileReader(ctx, partitionFile,
ioManager, writer, false));
+ executor.execute(PartitionFileReader.newForMaterializedPartition(ctx,
partitionFile, ioManager, writer));
}
@Override
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionFileReader.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionFileReader.java
index 609b32a..d35a658 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionFileReader.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionFileReader.java
@@ -27,6 +27,7 @@
import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.api.io.IFileHandle;
import org.apache.hyracks.api.io.IIOManager;
+import org.apache.hyracks.comm.channels.NetworkOutputChannel;
public class PartitionFileReader implements Runnable {
@@ -34,15 +35,29 @@
private final FileReference partitionFile;
private final IIOManager ioManager;
private final IFrameWriter writer;
+ private final NetworkOutputChannel networkOutputChannel;
private final boolean deleteFile;
+ private final boolean fileOutputChannel;
- public PartitionFileReader(IHyracksCommonContext ctx, FileReference
partitionFile, IIOManager ioManager,
- IFrameWriter writer, boolean deleteFile) {
+ private PartitionFileReader(IHyracksCommonContext ctx, FileReference
partitionFile, IIOManager ioManager,
+ IFrameWriter writer, boolean deleteFile, boolean
fileOutputChannel) {
this.ctx = ctx;
this.partitionFile = partitionFile;
this.ioManager = ioManager;
this.writer = writer;
this.deleteFile = deleteFile;
+ this.fileOutputChannel = fileOutputChannel;
+ this.networkOutputChannel = fileOutputChannel ? (NetworkOutputChannel)
writer : null;
+ }
+
+ public static PartitionFileReader
newForFileOutputChannel(IHyracksCommonContext ctx, FileReference partitionFile,
+ IIOManager ioManager, NetworkOutputChannel writer) {
+ return new PartitionFileReader(ctx, partitionFile, ioManager, writer,
true, true);
+ }
+
+ public static PartitionFileReader
newForMaterializedPartition(IHyracksCommonContext ctx,
+ FileReference partitionFile, IIOManager ioManager, IFrameWriter
writer) {
+ return new PartitionFileReader(ctx, partitionFile, ioManager, writer,
false, false);
}
@Override
@@ -61,6 +76,9 @@
long offset = 0;
ByteBuffer buffer = ctx.allocateFrame();
while (true) {
+ if (isRemoteClosed()) {
+ break;
+ }
buffer.clear();
long size = ioManager.syncRead(fh, offset, buffer);
if (size < 0) {
@@ -88,4 +106,8 @@
throw new RuntimeException(e);
}
}
+
+ private boolean isRemoteClosed() {
+ return fileOutputChannel && networkOutputChannel.isRemoteClosed();
+ }
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionFileReaderUtil.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionFileReaderUtil.java
index 2a105d9..93f1044 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionFileReaderUtil.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionFileReaderUtil.java
@@ -53,6 +53,6 @@
}
ExecutorService executor = ncs.getExecutor();
noc.setFrameSize(joblet.getInitialFrameSize());
- executor.execute(new PartitionFileReader(joblet, fileRef,
ncs.getIoManager(), noc, true));
+ executor.execute(PartitionFileReader.newForFileOutputChannel(joblet,
fileRef, ncs.getIoManager(), noc));
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/InputChannelFrameReader.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/InputChannelFrameReader.java
index f32adcc..e8e7d2a 100644
---
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/InputChannelFrameReader.java
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/InputChannelFrameReader.java
@@ -120,7 +120,7 @@
@Override
public void close() throws HyracksDataException {
-
+ channel.close();
}
@Override
diff --git
a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
index 0151dbf..14d72de 100644
---
a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
+++
b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
@@ -122,7 +122,7 @@
remoteCloseAck.set(true);
}
- boolean getRemoteEOS() {
+ public boolean getRemoteEOS() {
return remoteClose.get();
}
--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18223
To unsubscribe, or for help writing mail filters, visit
https://asterix-gerrit.ics.uci.edu/settings
Gerrit-Project: asterixdb
Gerrit-Branch: neo
Gerrit-Change-Id: I6f63252b64d3b70d407f7734edad4cd98dda493c
Gerrit-Change-Number: 18223
Gerrit-PatchSet: 1
Gerrit-Owner: Ali Alsuliman <[email protected]>
Gerrit-MessageType: newchange