Repository: hive
Updated Branches:
  refs/heads/master 2c7f2e9d6 -> df51738ad


HIVE-14091 : some errors are not propagated to LLAP external clients (Sergey 
Shelukhin/Jason Dere, reviewed by Jason Dere/Sergey Shelukhin)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/df51738a
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/df51738a
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/df51738a

Branch: refs/heads/master
Commit: df51738ad8c0de6e57b5b08bc20a06f0b3c734b7
Parents: 2c7f2e9
Author: Sergey Shelukhin <ser...@apache.org>
Authored: Thu Jun 30 12:21:22 2016 -0700
Committer: Sergey Shelukhin <ser...@apache.org>
Committed: Thu Jun 30 12:23:14 2016 -0700

----------------------------------------------------------------------
 .../hadoop/hive/llap/LlapBaseRecordReader.java   | 19 +++++++++++++++----
 .../hadoop/hive/llap/LlapBaseInputFormat.java    |  4 ++--
 .../hadoop/hive/llap/TestLlapOutputFormat.java   |  3 ++-
 3 files changed, 19 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/df51738a/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java
----------------------------------------------------------------------
diff --git 
a/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java 
b/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java
index f2700c8..59dec1b 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java
@@ -54,8 +54,10 @@ public class LlapBaseRecordReader<V extends 
WritableComparable> implements Recor
   protected final LinkedBlockingQueue<ReaderEvent> readerEvents = new 
LinkedBlockingQueue<ReaderEvent>();
   protected final long timeout;
   protected final Closeable client;
+  private final Closeable socket;
 
-  public LlapBaseRecordReader(InputStream in, Schema schema, Class<V> clazz, 
JobConf job, Closeable client) {
+  public LlapBaseRecordReader(InputStream in, Schema schema,
+      Class<V> clazz, JobConf job, Closeable client, Closeable socket) {
     din = new DataInputStream(in);
     this.schema = schema;
     this.clazz = clazz;
@@ -63,6 +65,7 @@ public class LlapBaseRecordReader<V extends 
WritableComparable> implements Recor
     this.timeout = 3 * HiveConf.getTimeVar(job,
         HiveConf.ConfVars.LLAP_DAEMON_AM_LIVENESS_CONNECTION_TIMEOUT_MS, 
TimeUnit.MILLISECONDS);
     this.client = client;
+    this.socket = socket;
   }
 
   public Schema getSchema() {
@@ -78,6 +81,7 @@ public class LlapBaseRecordReader<V extends 
WritableComparable> implements Recor
       LOG.error("Error closing input stream:" + err.getMessage(), err);
       caughtException = err;
     }
+    // Don't close the socket - the stream already does that if needed.
 
     if (client != null) {
       try {
@@ -152,9 +156,10 @@ public class LlapBaseRecordReader<V extends 
WritableComparable> implements Recor
           ReaderEvent event = getReaderEvent();
           switch (event.getEventType()) {
             case ERROR:
-              throw new IOException("Received reader event error: " + 
event.getMessage());
+              throw new IOException("Received reader event error: " + 
event.getMessage(), io);
             default:
-              throw new IOException("Got reader event type " + 
event.getEventType() + ", expected error event");
+              throw new IOException("Got reader event type " + 
event.getEventType()
+                  + ", expected error event", io);
           }
         }
       } else {
@@ -214,7 +219,13 @@ public class LlapBaseRecordReader<V extends 
WritableComparable> implements Recor
         if (LOG.isDebugEnabled()) {
           LOG.debug("Interrupting reader thread due to reader event with error 
" + event.getMessage());
         }
-        getReaderThread().interrupt();
+        readerThread.interrupt();
+        try {
+          socket.close();
+        } catch (IOException e) {
+          // Leave the client to time out.
+          LOG.error("Cannot close the socket on error", e);
+        }
         break;
       default:
         throw new RuntimeException("Unhandled ReaderEvent type " + 
event.getEventType() + " with message " + event.getMessage());

http://git-wip-us.apache.org/repos/asf/hive/blob/df51738a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
----------------------------------------------------------------------
diff --git 
a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java 
b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
index 6d63797..aef5762 100644
--- 
a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
+++ 
b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
@@ -170,8 +170,8 @@ public class LlapBaseInputFormat<V extends 
WritableComparable<?>>
     LOG.info("Registered id: " + fragmentId);
 
     @SuppressWarnings("rawtypes")
-    LlapBaseRecordReader recordReader = new LlapBaseRecordReader(
-        socket.getInputStream(), llapSplit.getSchema(), Text.class, job, 
llapClient);
+    LlapBaseRecordReader recordReader = new 
LlapBaseRecordReader(socket.getInputStream(),
+        llapSplit.getSchema(), Text.class, job, llapClient, 
(java.io.Closeable)socket);
     umbilicalResponder.setRecordReader(recordReader);
     return recordReader;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/df51738a/ql/src/test/org/apache/hadoop/hive/llap/TestLlapOutputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/llap/TestLlapOutputFormat.java 
b/ql/src/test/org/apache/hadoop/hive/llap/TestLlapOutputFormat.java
index 2288cd4..4159be5 100644
--- a/ql/src/test/org/apache/hadoop/hive/llap/TestLlapOutputFormat.java
+++ b/ql/src/test/org/apache/hadoop/hive/llap/TestLlapOutputFormat.java
@@ -103,7 +103,8 @@ public class TestLlapOutputFormat {
       writer.close(null);
 
       InputStream in = socket.getInputStream();
-      LlapBaseRecordReader reader = new LlapBaseRecordReader(in, null, 
Text.class, job, null);
+      LlapBaseRecordReader reader = new LlapBaseRecordReader(
+          in, null, Text.class, job, null, null);
 
       LOG.debug("Have record reader");
 

Reply via email to