Repository: hive Updated Branches: refs/heads/master 8ce0118ff -> 2c7f2e9d6
HIVE-14093: LLAP output format connection should wait for all writes to finish before closing channel (Jason Dere, reviewed by Prasanth Jayachandran) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/2c7f2e9d Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/2c7f2e9d Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/2c7f2e9d Branch: refs/heads/master Commit: 2c7f2e9d609e42154d0f699151f4e854051ba167 Parents: 8ce0118 Author: Jason Dere <jd...@hortonworks.com> Authored: Thu Jun 30 11:15:58 2016 -0700 Committer: Jason Dere <jd...@hortonworks.com> Committed: Thu Jun 30 11:17:49 2016 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/conf/HiveConf.java | 3 + .../hadoop/hive/llap/ChannelOutputStream.java | 73 ++++++++++++++------ .../hive/llap/LlapOutputFormatService.java | 14 ++-- 3 files changed, 57 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/2c7f2e9d/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index ad467c5..680b623 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2955,6 +2955,9 @@ public class HiveConf extends Configuration { "output after sending the fragment. The fragment will fail if its output is not claimed."), LLAP_DAEMON_OUTPUT_SERVICE_SEND_BUFFER_SIZE("hive.llap.daemon.output.service.send.buffer.size", 128 * 1024, "Send buffer size to be used by LLAP daemon output service"), + LLAP_DAEMON_OUTPUT_SERVICE_MAX_PENDING_WRITES("hive.llap.daemon.output.service.max.pending.writes", + 8, "Maximum number of queued writes allowed per connection when sending data\n" + + " via the LLAP output service to external clients."), LLAP_ENABLE_GRACE_JOIN_IN_LLAP("hive.llap.enable.grace.join.in.llap", false, "Override if grace join should be allowed to run in llap."), http://git-wip-us.apache.org/repos/asf/hive/blob/2c7f2e9d/ql/src/java/org/apache/hadoop/hive/llap/ChannelOutputStream.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/llap/ChannelOutputStream.java b/ql/src/java/org/apache/hadoop/hive/llap/ChannelOutputStream.java index 239e061..dbe90d6 100644 --- a/ql/src/java/org/apache/hadoop/hive/llap/ChannelOutputStream.java +++ b/ql/src/java/org/apache/hadoop/hive/llap/ChannelOutputStream.java @@ -40,25 +40,45 @@ public class ChannelOutputStream extends OutputStream { private ByteBuf buf; private byte[] singleByte = new byte[1]; private boolean closed = false; - private final Object channelWritabilityMonitor; + private final Object writeMonitor = new Object(); + private final int maxPendingWrites; + private volatile int pendingWrites = 0; - private ChannelFutureListener listener = new ChannelFutureListener() { + private ChannelFutureListener writeListener = new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) { + + pendingWrites--; + + if (future.isCancelled()) { + LOG.error("Write cancelled on ID " + id); + } else if (!future.isSuccess()) { + LOG.error("Write error on ID " + id, future.cause()); + } + + synchronized (writeMonitor) { + writeMonitor.notifyAll(); + } + } + }; + + private ChannelFutureListener closeListener = new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) { if (future.isCancelled()) { - LOG.error(id + " was cancelled"); + LOG.error("Close cancelled on ID " + id); } else if (!future.isSuccess()) { - LOG.error("Error on ID " + id, future.cause()); + LOG.error("Close failed on ID " + id, future.cause()); } } }; - public ChannelOutputStream(ChannelHandlerContext chc, String id, int bufSize, final Object monitor) { + public ChannelOutputStream(ChannelHandlerContext chc, String id, int bufSize, int maxOutstandingWrites) { this.chc = chc; this.id = id; this.bufSize = bufSize; this.buf = chc.alloc().buffer(bufSize); - this.channelWritabilityMonitor = monitor; + this.maxPendingWrites = maxOutstandingWrites; } @Override @@ -109,10 +129,13 @@ public class ChannelOutputStream extends OutputStream { LOG.error("Error flushing stream before close", err); } + closed = true; + + // Wait for all writes to finish before we actually close. + waitForWritesToFinish(0); + try { - chc.close().addListener(listener).sync(); - } catch (InterruptedException err) { - throw new IOException(err); + chc.close().addListener(closeListener); } finally { buf.release(); buf = null; @@ -121,26 +144,30 @@ public class ChannelOutputStream extends OutputStream { } } + private void waitForWritesToFinish(int desiredWriteCount) throws IOException { + synchronized (writeMonitor) { + // to prevent spurious wake up + while (pendingWrites > desiredWriteCount) { + try { + writeMonitor.wait(); + } catch (InterruptedException ie) { + throw new IOException("Interrupted while waiting for write operations to finish for " + id); + } + } + } + } + private void writeToChannel() throws IOException { if (closed) { throw new IOException("Already closed: " + id); } - chc.writeAndFlush(buf.copy()).addListener(listener); - buf.clear(); + // Wait if we have exceeded our max pending write count + waitForWritesToFinish(maxPendingWrites - 1); - // if underlying channel is not writable (perhaps because of slow consumer) wait for - // notification about writable state change - synchronized (channelWritabilityMonitor) { - // to prevent spurious wake up - while (!chc.channel().isWritable()) { - try { - channelWritabilityMonitor.wait(); - } catch (InterruptedException e) { - throw new IOException("Interrupted when waiting for channel writability state change", e); - } - } - } + pendingWrites++; + chc.writeAndFlush(buf.copy()).addListener(writeListener); + buf.clear(); } private void writeInternal(byte[] b, int off, int len) throws IOException { http://git-wip-us.apache.org/repos/asf/hive/blob/2c7f2e9d/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java b/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java index 825488f..0619b79 100644 --- a/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java +++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java @@ -166,7 +166,7 @@ public class LlapOutputFormatService { protected class LlapOutputFormatServiceHandler extends SimpleChannelInboundHandler<LlapOutputSocketInitMessage> { private final int sendBufferSize; - private final Object channelWritabilityMonitor = new Object(); + public LlapOutputFormatServiceHandler(final int sendBufferSize) { this.sendBufferSize = sendBufferSize; } @@ -194,9 +194,11 @@ public class LlapOutputFormatService { } } LOG.debug("registering socket for: " + id); + int maxPendingWrites = HiveConf.getIntVar(conf, + HiveConf.ConfVars.LLAP_DAEMON_OUTPUT_SERVICE_MAX_PENDING_WRITES); @SuppressWarnings("rawtypes") LlapRecordWriter writer = new LlapRecordWriter( - new ChannelOutputStream(ctx, id, sendBufferSize, channelWritabilityMonitor)); + new ChannelOutputStream(ctx, id, sendBufferSize, maxPendingWrites)); boolean isFailed = true; synchronized (lock) { if (!writers.containsKey(id)) { @@ -222,14 +224,6 @@ public class LlapOutputFormatService { } LOG.error(error); } - - @Override - public void channelWritabilityChanged(final ChannelHandlerContext ctx) throws Exception { - super.channelWritabilityChanged(ctx); - synchronized (channelWritabilityMonitor) { - channelWritabilityMonitor.notifyAll(); - } - } } protected class LlapOutputFormatChannelCloseListener implements ChannelFutureListener {