Repository: hive Updated Branches: refs/heads/master 696104fab -> 2f285aea0
HIVE-13900: HiveStatement.executeAsync() may not work properly when hive.server2.async.exec.async.compile is turned on (Aihua Xu, reviewed by Jimmy Xiang) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/2f285aea Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/2f285aea Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/2f285aea Branch: refs/heads/master Commit: 2f285aea07ddad904058b51c137abe633289794d Parents: 696104f Author: Aihua Xu <aihu...@apache.org> Authored: Wed Jun 8 15:14:35 2016 -0400 Committer: Aihua Xu <aihu...@apache.org> Committed: Mon Jun 13 14:30:19 2016 -0400 ---------------------------------------------------------------------- .../org/apache/hive/jdbc/TestJdbcDriver2.java | 59 ++++++++++++++++++-- .../org/apache/hive/jdbc/HiveStatement.java | 24 +++++++- 2 files changed, 78 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/2f285aea/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java index 7243648..b0fa98f 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java @@ -2734,7 +2734,33 @@ public void testParseUrlHttpMode() throws SQLException, JdbcUriParseException, @Test public void testSelectExecAsync() throws Exception { HiveStatement stmt = (HiveStatement) con.createStatement(); - ResultSet rs; + testSelect(stmt); + stmt.close(); + } + + @Test + public void testSelectExecAsync2() throws Exception { + HiveStatement stmt = (HiveStatement) con.createStatement(); + + stmt.execute("SET hive.driver.parallel.compilation=true"); + stmt.execute("SET hive.server2.async.exec.async.compile=true"); + + testSelect(stmt); + stmt.close(); + } + + @Test + public void testSelectExecAsync3() throws Exception { + HiveStatement stmt = (HiveStatement) con.createStatement(); + + stmt.execute("SET hive.driver.parallel.compilation=true"); + stmt.execute("SET hive.server2.async.exec.async.compile=false"); + + testSelect(stmt); + stmt.close(); + } + + private void testSelect(HiveStatement stmt) throws SQLException { // Expected row count of the join query we'll run int expectedCount = 1028; int rowCount = 0; @@ -2742,7 +2768,7 @@ public void testParseUrlHttpMode() throws SQLException, JdbcUriParseException, stmt.executeAsync("select t1.value as v11, " + "t2.value as v12 from " + tableName + " t1 join " + tableName + " t2 on t1.under_col = t2.under_col"); assertTrue(isResulSet); - rs = stmt.getResultSet(); + ResultSet rs = stmt.getResultSet(); assertNotNull(rs); // ResultSet#next blocks until the async query is complete while (rs.next()) { @@ -2751,7 +2777,6 @@ public void testParseUrlHttpMode() throws SQLException, JdbcUriParseException, assertNotNull(value); } assertEquals(rowCount, expectedCount); - stmt.close(); } /** @@ -2789,6 +2814,33 @@ public void testParseUrlHttpMode() throws SQLException, JdbcUriParseException, @Test public void testInsertOverwriteExecAsync() throws Exception { HiveStatement stmt = (HiveStatement) con.createStatement(); + testInsertOverwrite(stmt); + stmt.close(); + } + + @Test + public void testInsertOverwriteExecAsync2() throws Exception { + HiveStatement stmt = (HiveStatement) con.createStatement(); + + stmt.execute("SET hive.driver.parallel.compilation=true"); + stmt.execute("SET hive.server2.async.exec.async.compile=true"); + + testInsertOverwrite(stmt); + stmt.close(); + } + + @Test + public void testInsertOverwriteExecAsync3() throws Exception { + HiveStatement stmt = (HiveStatement) con.createStatement(); + + stmt.execute("SET hive.driver.parallel.compilation=true"); + stmt.execute("SET hive.server2.async.exec.async.compile=false"); + + testInsertOverwrite(stmt); + stmt.close(); + } + + private void testInsertOverwrite(HiveStatement stmt) throws SQLException { String tblName = "testInsertOverwriteExecAsync"; int rowCount = 0; stmt.execute("create table " + tblName + " (col1 int , col2 string)"); @@ -2807,6 +2859,5 @@ public void testParseUrlHttpMode() throws SQLException, JdbcUriParseException, } assertEquals(rowCount, dataFileRowCount); stmt.execute("drop table " + tblName); - stmt.close(); } } http://git-wip-us.apache.org/repos/asf/hive/blob/2f285aea/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java ---------------------------------------------------------------------- diff --git a/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java b/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java index c4784c3..a242501 100644 --- a/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java +++ b/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java @@ -278,7 +278,8 @@ public class HiveStatement implements java.sql.Statement { */ public boolean executeAsync(String sql) throws SQLException { runAsyncOnServer(sql); - if (!stmtHandle.isHasResultSet()) { + TGetOperationStatusResp status = waitForResultSetStatus(); + if (!status.isHasResultSet()) { return false; } resultSet = @@ -318,6 +319,27 @@ public class HiveStatement implements java.sql.Statement { } } + /** + * Poll the result set status by checking if isSetHasResultSet is set + * @return + * @throws SQLException + */ + private TGetOperationStatusResp waitForResultSetStatus() throws SQLException { + TGetOperationStatusReq statusReq = new TGetOperationStatusReq(stmtHandle); + TGetOperationStatusResp statusResp = null; + + while(statusResp == null || !statusResp.isSetHasResultSet()) { + try { + statusResp = client.GetOperationStatus(statusReq); + } catch (TException e) { + isLogBeingGenerated = false; + throw new SQLException(e.toString(), "08S01", e); + } + } + + return statusResp; + } + TGetOperationStatusResp waitForOperationToComplete() throws SQLException { TGetOperationStatusReq statusReq = new TGetOperationStatusReq(stmtHandle); TGetOperationStatusResp statusResp = null;