This is an automated email from the ASF dual-hosted git repository.

mgaido pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-livy.git


The following commit(s) were added to refs/heads/master by this push:
     new 7dee3cc  [LIVY-571] cleanupStatement should not throw exception when 
statementId not exist
7dee3cc is described below

commit 7dee3cc142f79a131d54a37c7a56bb697c160cc1
Author: Jeffrey(Xilang) Yan <7855100+yan...@users.noreply.github.com>
AuthorDate: Mon Jul 22 12:15:32 2019 +0200

    [LIVY-571] cleanupStatement should not throw exception when statementId not 
exist
    
    ## What changes were proposed in this pull request?
    
    `ThriftSessionState` is used to store query result by statementId. When an 
exception is thrown during execute query, no query result is stored. But when a 
statement is closed from beeline, a request is invoked to remove cached query 
result in `ThriftSessionState`.
    
    Remove statement query result from `ThriftSessionState` currently throws an 
exception, hiding the original query failure exception and message. The PR 
makes it return cleanly, so the proper exception is reported to the end user.
    
    ## How was this patch tested?
    
    Added unit tests.
    
    Author: Jeffrey(Xilang) Yan <7855100+yan...@users.noreply.github.com>
    Author: yantzu <7855100+yan...@users.noreply.github.com>
    Author: 80254702 <80254...@adc.com>
    
    Closes #182 from yantzu/LIVY-571.
---
 .../LivyExecuteStatementOperation.scala            |  6 ++++-
 .../apache/livy/thriftserver/rpc/RpcClient.scala   |  2 +-
 .../livy/thriftserver/ThriftServerSuites.scala     | 31 +++++++++++++++++++++-
 .../thriftserver/session/CleanupStatementJob.java  |  7 +++--
 .../thriftserver/session/ThriftSessionState.java   |  8 +++---
 .../thriftserver/session/ThriftSessionTest.java    | 12 ++++-----
 6 files changed, 50 insertions(+), 16 deletions(-)

diff --git 
a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyExecuteStatementOperation.scala
 
b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyExecuteStatementOperation.scala
index a067788..ebb8e1d 100644
--- 
a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyExecuteStatementOperation.scala
+++ 
b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyExecuteStatementOperation.scala
@@ -174,7 +174,11 @@ class LivyExecuteStatementOperation(
 
   private def cleanup(state: OperationState) {
     if (statementId != null && rpcClientValid) {
-      rpcClient.cleanupStatement(sessionHandle, statementId).get()
+      val cleaned = rpcClient.cleanupStatement(sessionHandle, 
statementId).get()
+      if (!cleaned) {
+        warn(s"Fail to cleanup query $statementId (session = 
${sessionHandle.getSessionId}), " +
+          "this message can be ignored if the query failed.")
+      }
     }
     setState(state)
   }
diff --git 
a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/rpc/RpcClient.scala
 
b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/rpc/RpcClient.scala
index beba6a9..b3b2ddf 100644
--- 
a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/rpc/RpcClient.scala
+++ 
b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/rpc/RpcClient.scala
@@ -76,7 +76,7 @@ class RpcClient(livySession: InteractiveSession) extends 
Logging {
   def cleanupStatement(
       sessionHandle: SessionHandle,
       statementId: String,
-      cancelJob: Boolean = false): JobHandle[_] = {
+      cancelJob: Boolean = false): JobHandle[java.lang.Boolean] = {
     info(s"Cleaning up remote session for statementId = $statementId")
     require(null != statementId, s"Invalid statementId specified. statementId 
= $statementId")
     livySession.recordActivity()
diff --git 
a/thriftserver/server/src/test/scala/org/apache/livy/thriftserver/ThriftServerSuites.scala
 
b/thriftserver/server/src/test/scala/org/apache/livy/thriftserver/ThriftServerSuites.scala
index 3099436..a3d9e88 100644
--- 
a/thriftserver/server/src/test/scala/org/apache/livy/thriftserver/ThriftServerSuites.scala
+++ 
b/thriftserver/server/src/test/scala/org/apache/livy/thriftserver/ThriftServerSuites.scala
@@ -17,7 +17,7 @@
 
 package org.apache.livy.thriftserver
 
-import java.sql.{Date, Statement}
+import java.sql.{Date, SQLException, Statement}
 
 import org.apache.livy.LivyConf
 
@@ -134,6 +134,35 @@ class BinaryThriftServerSuite extends ThriftServerBaseTest 
with CommonThriftTest
       statement.close()
     }
   }
+
+  test("LIVY-571: returns a meaningful exception when database doesn't exist") 
{
+    assume(hiveSupportEnabled(formattedSparkVersion._1, livyConf))
+    withJdbcConnection(jdbcUri("default")) { c =>
+      val caught = intercept[SQLException] {
+        val statement = c.createStatement()
+        try {
+          statement.executeQuery("use invalid_database")
+        } finally {
+          statement.close()
+        }
+      }
+      assert(caught.getMessage.contains("Database 'invalid_database' not 
found"))
+    }
+  }
+
+  test("LIVY-571: returns a meaningful exception when global_temp table 
doesn't exist") {
+    withJdbcConnection { c =>
+      val caught = intercept[SQLException] {
+        val statement = c.createStatement()
+        try {
+          statement.executeQuery("select * from global_temp.invalid_table")
+        } finally {
+          statement.close()
+        }
+      }
+      assert(caught.getMessage.contains("Table or view not found: 
`global_temp`.`invalid_table`"))
+    }
+  }
 }
 
 class HttpThriftServerSuite extends ThriftServerBaseTest with 
CommonThriftTests {
diff --git 
a/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/CleanupStatementJob.java
 
b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/CleanupStatementJob.java
index b8a0ee8..bc62b62 100644
--- 
a/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/CleanupStatementJob.java
+++ 
b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/CleanupStatementJob.java
@@ -23,7 +23,7 @@ import org.apache.livy.JobContext;
 /**
  * Job used to clean up state held for a statement.
  */
-public class CleanupStatementJob implements Job<Void> {
+public class CleanupStatementJob implements Job<Boolean> {
 
   private final String sessionId;
   private final String statementId;
@@ -38,10 +38,9 @@ public class CleanupStatementJob implements Job<Void> {
   }
 
   @Override
-  public Void call(JobContext ctx) {
+  public Boolean call(JobContext ctx) {
     ThriftSessionState session = ThriftSessionState.get(ctx, sessionId);
-    session.cleanupStatement(statementId);
-    return null;
+    return session.cleanupStatement(statementId);
   }
 
 }
diff --git 
a/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/ThriftSessionState.java
 
b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/ThriftSessionState.java
index 1a89d21..1d71259 100644
--- 
a/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/ThriftSessionState.java
+++ 
b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/ThriftSessionState.java
@@ -101,12 +101,14 @@ class ThriftSessionState {
     return st;
   }
 
-  void cleanupStatement(String statementId) {
+  boolean cleanupStatement(String statementId) {
     checkNotNull(statementId, "No statement ID.");
     if (statements.remove(statementId) == null) {
-      throw statementNotFound(statementId);
+      return false;
+    } else {
+      ctx.sc().cancelJobGroup(statementId);
+      return true;
     }
-    ctx.sc().cancelJobGroup(statementId);
   }
 
   void dispose() {
diff --git 
a/thriftserver/session/src/test/java/org/apache/livy/thriftserver/session/ThriftSessionTest.java
 
b/thriftserver/session/src/test/java/org/apache/livy/thriftserver/session/ThriftSessionTest.java
index f1d8011..4dd30a2 100644
--- 
a/thriftserver/session/src/test/java/org/apache/livy/thriftserver/session/ThriftSessionTest.java
+++ 
b/thriftserver/session/src/test/java/org/apache/livy/thriftserver/session/ThriftSessionTest.java
@@ -83,17 +83,17 @@ public class ThriftSessionTest {
     // Start a second session. Try to cleanup a statement that belongs to 
another session.
     String s2 = nextSession();
     waitFor(new RegisterSessionJob(s2));
-    expectError(new CleanupStatementJob(s2, st1), "not found in session");
+    assertFalse(waitFor(new CleanupStatementJob(s2, st1)));
     waitFor(new UnregisterSessionJob(s2));
 
     // Clean up the statement's state.
-    waitFor(new CleanupStatementJob(s1, st1));
-    expectError(new CleanupStatementJob(s1, st1), "not found in session");
+    assertTrue(waitFor(new CleanupStatementJob(s1, st1)));
+    assertFalse(waitFor(new CleanupStatementJob(s1, st1)));
 
     // Insert data into the previously created table, and fetch results from 
it.
     String st2 = nextStatement();
     waitFor(newSqlJob(s1, st2, "INSERT INTO test VALUES (1, \"one\"), (2, 
\"two\")"));
-    waitFor(new CleanupStatementJob(s1, st2));
+    assertTrue(waitFor(new CleanupStatementJob(s1, st2)));
 
     String st3 = nextStatement();
     waitFor(newSqlJob(s1, st3, "SELECT * FROM test"));
@@ -111,7 +111,7 @@ public class ThriftSessionTest {
     assertEquals(Integer.valueOf(2), cols[0].get(1));
     assertEquals("two", cols[1].get(1));
 
-    waitFor(new CleanupStatementJob(s1, st3));
+    assertTrue(waitFor(new CleanupStatementJob(s1, st3)));
 
     // Run a statement that returns a null, to make sure the receiving side 
sees it correctly.
     String st4 = nextStatement();
@@ -123,7 +123,7 @@ public class ThriftSessionTest {
     assertEquals(1, cols[0].size());
     assertTrue(cols[0].getNulls().get(0));
 
-    waitFor(new CleanupStatementJob(s1, st4));
+    assertTrue(waitFor(new CleanupStatementJob(s1, st4)));
 
     // Tear down the session.
     waitFor(new UnregisterSessionJob(s1));

Reply via email to