This is an automated email from the ASF dual-hosted git repository. mgaido pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-livy.git
The following commit(s) were added to refs/heads/master by this push: new 7dee3cc [LIVY-571] cleanupStatement should not throw exception when statementId not exist 7dee3cc is described below commit 7dee3cc142f79a131d54a37c7a56bb697c160cc1 Author: Jeffrey(Xilang) Yan <7855100+yan...@users.noreply.github.com> AuthorDate: Mon Jul 22 12:15:32 2019 +0200 [LIVY-571] cleanupStatement should not throw exception when statementId not exist ## What changes were proposed in this pull request? `ThriftSessionState` is used to store query result by statementId. When an exception is thrown during execute query, no query result is stored. But when a statement is closed from beeline, a request is invoked to remove cached query result in `ThriftSessionState`. Remove statement query result from `ThriftSessionState` currently throws an exception, hiding the original query failure exception and message. The PR makes it return cleanly, so the proper exception is reported to the end user. ## How was this patch tested? Added unit tests. Author: Jeffrey(Xilang) Yan <7855100+yan...@users.noreply.github.com> Author: yantzu <7855100+yan...@users.noreply.github.com> Author: 80254702 <80254...@adc.com> Closes #182 from yantzu/LIVY-571. --- .../LivyExecuteStatementOperation.scala | 6 ++++- .../apache/livy/thriftserver/rpc/RpcClient.scala | 2 +- .../livy/thriftserver/ThriftServerSuites.scala | 31 +++++++++++++++++++++- .../thriftserver/session/CleanupStatementJob.java | 7 +++-- .../thriftserver/session/ThriftSessionState.java | 8 +++--- .../thriftserver/session/ThriftSessionTest.java | 12 ++++----- 6 files changed, 50 insertions(+), 16 deletions(-) diff --git a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyExecuteStatementOperation.scala b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyExecuteStatementOperation.scala index a067788..ebb8e1d 100644 --- a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyExecuteStatementOperation.scala +++ b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyExecuteStatementOperation.scala @@ -174,7 +174,11 @@ class LivyExecuteStatementOperation( private def cleanup(state: OperationState) { if (statementId != null && rpcClientValid) { - rpcClient.cleanupStatement(sessionHandle, statementId).get() + val cleaned = rpcClient.cleanupStatement(sessionHandle, statementId).get() + if (!cleaned) { + warn(s"Fail to cleanup query $statementId (session = ${sessionHandle.getSessionId}), " + + "this message can be ignored if the query failed.") + } } setState(state) } diff --git a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/rpc/RpcClient.scala b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/rpc/RpcClient.scala index beba6a9..b3b2ddf 100644 --- a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/rpc/RpcClient.scala +++ b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/rpc/RpcClient.scala @@ -76,7 +76,7 @@ class RpcClient(livySession: InteractiveSession) extends Logging { def cleanupStatement( sessionHandle: SessionHandle, statementId: String, - cancelJob: Boolean = false): JobHandle[_] = { + cancelJob: Boolean = false): JobHandle[java.lang.Boolean] = { info(s"Cleaning up remote session for statementId = $statementId") require(null != statementId, s"Invalid statementId specified. statementId = $statementId") livySession.recordActivity() diff --git a/thriftserver/server/src/test/scala/org/apache/livy/thriftserver/ThriftServerSuites.scala b/thriftserver/server/src/test/scala/org/apache/livy/thriftserver/ThriftServerSuites.scala index 3099436..a3d9e88 100644 --- a/thriftserver/server/src/test/scala/org/apache/livy/thriftserver/ThriftServerSuites.scala +++ b/thriftserver/server/src/test/scala/org/apache/livy/thriftserver/ThriftServerSuites.scala @@ -17,7 +17,7 @@ package org.apache.livy.thriftserver -import java.sql.{Date, Statement} +import java.sql.{Date, SQLException, Statement} import org.apache.livy.LivyConf @@ -134,6 +134,35 @@ class BinaryThriftServerSuite extends ThriftServerBaseTest with CommonThriftTest statement.close() } } + + test("LIVY-571: returns a meaningful exception when database doesn't exist") { + assume(hiveSupportEnabled(formattedSparkVersion._1, livyConf)) + withJdbcConnection(jdbcUri("default")) { c => + val caught = intercept[SQLException] { + val statement = c.createStatement() + try { + statement.executeQuery("use invalid_database") + } finally { + statement.close() + } + } + assert(caught.getMessage.contains("Database 'invalid_database' not found")) + } + } + + test("LIVY-571: returns a meaningful exception when global_temp table doesn't exist") { + withJdbcConnection { c => + val caught = intercept[SQLException] { + val statement = c.createStatement() + try { + statement.executeQuery("select * from global_temp.invalid_table") + } finally { + statement.close() + } + } + assert(caught.getMessage.contains("Table or view not found: `global_temp`.`invalid_table`")) + } + } } class HttpThriftServerSuite extends ThriftServerBaseTest with CommonThriftTests { diff --git a/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/CleanupStatementJob.java b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/CleanupStatementJob.java index b8a0ee8..bc62b62 100644 --- a/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/CleanupStatementJob.java +++ b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/CleanupStatementJob.java @@ -23,7 +23,7 @@ import org.apache.livy.JobContext; /** * Job used to clean up state held for a statement. */ -public class CleanupStatementJob implements Job<Void> { +public class CleanupStatementJob implements Job<Boolean> { private final String sessionId; private final String statementId; @@ -38,10 +38,9 @@ public class CleanupStatementJob implements Job<Void> { } @Override - public Void call(JobContext ctx) { + public Boolean call(JobContext ctx) { ThriftSessionState session = ThriftSessionState.get(ctx, sessionId); - session.cleanupStatement(statementId); - return null; + return session.cleanupStatement(statementId); } } diff --git a/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/ThriftSessionState.java b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/ThriftSessionState.java index 1a89d21..1d71259 100644 --- a/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/ThriftSessionState.java +++ b/thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/ThriftSessionState.java @@ -101,12 +101,14 @@ class ThriftSessionState { return st; } - void cleanupStatement(String statementId) { + boolean cleanupStatement(String statementId) { checkNotNull(statementId, "No statement ID."); if (statements.remove(statementId) == null) { - throw statementNotFound(statementId); + return false; + } else { + ctx.sc().cancelJobGroup(statementId); + return true; } - ctx.sc().cancelJobGroup(statementId); } void dispose() { diff --git a/thriftserver/session/src/test/java/org/apache/livy/thriftserver/session/ThriftSessionTest.java b/thriftserver/session/src/test/java/org/apache/livy/thriftserver/session/ThriftSessionTest.java index f1d8011..4dd30a2 100644 --- a/thriftserver/session/src/test/java/org/apache/livy/thriftserver/session/ThriftSessionTest.java +++ b/thriftserver/session/src/test/java/org/apache/livy/thriftserver/session/ThriftSessionTest.java @@ -83,17 +83,17 @@ public class ThriftSessionTest { // Start a second session. Try to cleanup a statement that belongs to another session. String s2 = nextSession(); waitFor(new RegisterSessionJob(s2)); - expectError(new CleanupStatementJob(s2, st1), "not found in session"); + assertFalse(waitFor(new CleanupStatementJob(s2, st1))); waitFor(new UnregisterSessionJob(s2)); // Clean up the statement's state. - waitFor(new CleanupStatementJob(s1, st1)); - expectError(new CleanupStatementJob(s1, st1), "not found in session"); + assertTrue(waitFor(new CleanupStatementJob(s1, st1))); + assertFalse(waitFor(new CleanupStatementJob(s1, st1))); // Insert data into the previously created table, and fetch results from it. String st2 = nextStatement(); waitFor(newSqlJob(s1, st2, "INSERT INTO test VALUES (1, \"one\"), (2, \"two\")")); - waitFor(new CleanupStatementJob(s1, st2)); + assertTrue(waitFor(new CleanupStatementJob(s1, st2))); String st3 = nextStatement(); waitFor(newSqlJob(s1, st3, "SELECT * FROM test")); @@ -111,7 +111,7 @@ public class ThriftSessionTest { assertEquals(Integer.valueOf(2), cols[0].get(1)); assertEquals("two", cols[1].get(1)); - waitFor(new CleanupStatementJob(s1, st3)); + assertTrue(waitFor(new CleanupStatementJob(s1, st3))); // Run a statement that returns a null, to make sure the receiving side sees it correctly. String st4 = nextStatement(); @@ -123,7 +123,7 @@ public class ThriftSessionTest { assertEquals(1, cols[0].size()); assertTrue(cols[0].getNulls().get(0)); - waitFor(new CleanupStatementJob(s1, st4)); + assertTrue(waitFor(new CleanupStatementJob(s1, st4))); // Tear down the session. waitFor(new UnregisterSessionJob(s1));