Repository: hive
Updated Branches:
  refs/heads/master 8ce0118ff -> 2c7f2e9d6


HIVE-14093: LLAP output format connection should wait for all writes to finish 
before closing channel (Jason Dere, reviewed by Prasanth Jayachandran)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/2c7f2e9d
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/2c7f2e9d
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/2c7f2e9d

Branch: refs/heads/master
Commit: 2c7f2e9d609e42154d0f699151f4e854051ba167
Parents: 8ce0118
Author: Jason Dere <jd...@hortonworks.com>
Authored: Thu Jun 30 11:15:58 2016 -0700
Committer: Jason Dere <jd...@hortonworks.com>
Committed: Thu Jun 30 11:17:49 2016 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |  3 +
 .../hadoop/hive/llap/ChannelOutputStream.java   | 73 ++++++++++++++------
 .../hive/llap/LlapOutputFormatService.java      | 14 ++--
 3 files changed, 57 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/2c7f2e9d/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java 
b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index ad467c5..680b623 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2955,6 +2955,9 @@ public class HiveConf extends Configuration {
         "output after sending the fragment. The fragment will fail if its 
output is not claimed."),
     
LLAP_DAEMON_OUTPUT_SERVICE_SEND_BUFFER_SIZE("hive.llap.daemon.output.service.send.buffer.size",
         128 * 1024, "Send buffer size to be used by LLAP daemon output 
service"),
+    
LLAP_DAEMON_OUTPUT_SERVICE_MAX_PENDING_WRITES("hive.llap.daemon.output.service.max.pending.writes",
+        8, "Maximum number of queued writes allowed per connection when 
sending data\n" +
+        " via the LLAP output service to external clients."),
     LLAP_ENABLE_GRACE_JOIN_IN_LLAP("hive.llap.enable.grace.join.in.llap", 
false,
         "Override if grace join should be allowed to run in llap."),
 

http://git-wip-us.apache.org/repos/asf/hive/blob/2c7f2e9d/ql/src/java/org/apache/hadoop/hive/llap/ChannelOutputStream.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/llap/ChannelOutputStream.java 
b/ql/src/java/org/apache/hadoop/hive/llap/ChannelOutputStream.java
index 239e061..dbe90d6 100644
--- a/ql/src/java/org/apache/hadoop/hive/llap/ChannelOutputStream.java
+++ b/ql/src/java/org/apache/hadoop/hive/llap/ChannelOutputStream.java
@@ -40,25 +40,45 @@ public class ChannelOutputStream extends OutputStream {
   private ByteBuf buf;
   private byte[] singleByte = new byte[1];
   private boolean closed = false;
-  private final Object channelWritabilityMonitor;
+  private final Object writeMonitor = new Object();
+  private final int maxPendingWrites;
+  private volatile int pendingWrites = 0;
 
-  private ChannelFutureListener listener = new ChannelFutureListener() {
+  private ChannelFutureListener writeListener = new ChannelFutureListener() {
+    @Override
+    public void operationComplete(ChannelFuture future) {
+
+      pendingWrites--;
+
+      if (future.isCancelled()) {
+        LOG.error("Write cancelled on ID " + id);
+      } else if (!future.isSuccess()) {
+        LOG.error("Write error on ID " + id, future.cause());
+      }
+
+      synchronized (writeMonitor) {
+        writeMonitor.notifyAll();
+      }
+    }
+  };
+
+  private ChannelFutureListener closeListener = new ChannelFutureListener() {
     @Override
     public void operationComplete(ChannelFuture future) {
       if (future.isCancelled()) {
-        LOG.error(id + " was cancelled");
+        LOG.error("Close cancelled on ID " + id);
       } else if (!future.isSuccess()) {
-        LOG.error("Error on ID " + id, future.cause());
+        LOG.error("Close failed on ID " + id, future.cause());
       }
     }
   };
 
-  public ChannelOutputStream(ChannelHandlerContext chc, String id, int 
bufSize, final Object monitor) {
+  public ChannelOutputStream(ChannelHandlerContext chc, String id, int 
bufSize, int maxOutstandingWrites) {
     this.chc = chc;
     this.id = id;
     this.bufSize = bufSize;
     this.buf = chc.alloc().buffer(bufSize);
-    this.channelWritabilityMonitor = monitor;
+    this.maxPendingWrites = maxOutstandingWrites;
   }
 
   @Override
@@ -109,10 +129,13 @@ public class ChannelOutputStream extends OutputStream {
       LOG.error("Error flushing stream before close", err);
     }
 
+    closed = true;
+
+    // Wait for all writes to finish before we actually close.
+    waitForWritesToFinish(0);
+
     try {
-      chc.close().addListener(listener).sync();
-    } catch (InterruptedException err) {
-      throw new IOException(err);
+      chc.close().addListener(closeListener);
     } finally {
       buf.release();
       buf = null;
@@ -121,26 +144,30 @@ public class ChannelOutputStream extends OutputStream {
     }
   }
 
+  private void waitForWritesToFinish(int desiredWriteCount) throws IOException 
{
+    synchronized (writeMonitor) {
+      // to prevent spurious wake up
+      while (pendingWrites > desiredWriteCount) {
+        try {
+          writeMonitor.wait();
+        } catch (InterruptedException ie) {
+          throw new IOException("Interrupted while waiting for write 
operations to finish for " + id);
+        }
+      }
+    }
+  }
+
   private void writeToChannel() throws IOException {
     if (closed) {
       throw new IOException("Already closed: " + id);
     }
 
-    chc.writeAndFlush(buf.copy()).addListener(listener);
-    buf.clear();
+    // Wait if we have exceeded our max pending write count
+    waitForWritesToFinish(maxPendingWrites - 1);
 
-    // if underlying channel is not writable (perhaps because of slow 
consumer) wait for
-    // notification about writable state change
-    synchronized (channelWritabilityMonitor) {
-      // to prevent spurious wake up
-      while (!chc.channel().isWritable()) {
-        try {
-          channelWritabilityMonitor.wait();
-        } catch (InterruptedException e) {
-          throw new IOException("Interrupted when waiting for channel 
writability state change", e);
-        }
-      }
-    }
+    pendingWrites++;
+    chc.writeAndFlush(buf.copy()).addListener(writeListener);
+    buf.clear();
   }
 
   private void writeInternal(byte[] b, int off, int len) throws IOException {

http://git-wip-us.apache.org/repos/asf/hive/blob/2c7f2e9d/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java 
b/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java
index 825488f..0619b79 100644
--- a/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java
+++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java
@@ -166,7 +166,7 @@ public class LlapOutputFormatService {
   protected class LlapOutputFormatServiceHandler
     extends SimpleChannelInboundHandler<LlapOutputSocketInitMessage> {
     private final int sendBufferSize;
-    private final Object channelWritabilityMonitor = new Object();
+
     public LlapOutputFormatServiceHandler(final int sendBufferSize) {
       this.sendBufferSize = sendBufferSize;
     }
@@ -194,9 +194,11 @@ public class LlapOutputFormatService {
         }
       }
       LOG.debug("registering socket for: " + id);
+      int maxPendingWrites = HiveConf.getIntVar(conf,
+          HiveConf.ConfVars.LLAP_DAEMON_OUTPUT_SERVICE_MAX_PENDING_WRITES);
       @SuppressWarnings("rawtypes")
       LlapRecordWriter writer = new LlapRecordWriter(
-          new ChannelOutputStream(ctx, id, sendBufferSize, 
channelWritabilityMonitor));
+          new ChannelOutputStream(ctx, id, sendBufferSize, maxPendingWrites));
       boolean isFailed = true;
       synchronized (lock) {
         if (!writers.containsKey(id)) {
@@ -222,14 +224,6 @@ public class LlapOutputFormatService {
       }
       LOG.error(error);
     }
-
-    @Override
-    public void channelWritabilityChanged(final ChannelHandlerContext ctx) 
throws Exception {
-      super.channelWritabilityChanged(ctx);
-      synchronized (channelWritabilityMonitor) {
-        channelWritabilityMonitor.notifyAll();
-      }
-    }
   }
 
   protected class LlapOutputFormatChannelCloseListener implements 
ChannelFutureListener {

Reply via email to