This is an automated email from the ASF dual-hosted git repository. dmollitor pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new a69e676 HIVE-23005: Consider Default JDBC Fetch Size From HS2 (David Mollitor reviewed by Naveen Gangam) a69e676 is described below commit a69e676e90dd023072fb15d384f41ddb7a164445 Author: David Mollitor <dmolli...@apache.org> AuthorDate: Wed Mar 25 18:01:11 2020 -0400 HIVE-23005: Consider Default JDBC Fetch Size From HS2 (David Mollitor reviewed by Naveen Gangam) --- .../org/apache/hive/jdbc/TestJdbcWithMiniHS2.java | 23 +--- .../java/org/apache/hive/jdbc/HiveConnection.java | 38 ++++--- .../java/org/apache/hive/jdbc/HiveStatement.java | 43 ++++--- .../org/apache/hive/jdbc/TestHiveStatement.java | 123 +++++++++++++++++---- ql/src/java/org/apache/hadoop/hive/ql/Driver.java | 6 +- .../hive/service/cli/operation/SQLOperation.java | 4 +- .../hive/service/cli/session/HiveSessionImpl.java | 18 --- .../hive/service/cli/thrift/ThriftCLIService.java | 14 ++- 8 files changed, 158 insertions(+), 111 deletions(-) diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java index 7fa6796..2100906 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java @@ -1500,30 +1500,11 @@ public class TestJdbcWithMiniHS2 { @Test public void testFetchSize() throws Exception { - // Test setting fetch size below max Connection fsConn = getConnection(miniHS2.getJdbcURL("default", "fetchSize=50", ""), System.getProperty("user.name"), "bar"); Statement stmt = fsConn.createStatement(); - stmt.execute("set hive.server2.thrift.resultset.serialize.in.tasks=true"); - int fetchSize = stmt.getFetchSize(); - assertEquals(50, fetchSize); - stmt.close(); - fsConn.close(); - // Test setting fetch size above max - fsConn = getConnection( - miniHS2.getJdbcURL( - "default", - "fetchSize=" + (miniHS2.getHiveConf().getIntVar( - HiveConf.ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_MAX_FETCH_SIZE) + 1), - ""), - System.getProperty("user.name"), "bar"); - stmt = fsConn.createStatement(); - stmt.execute("set hive.server2.thrift.resultset.serialize.in.tasks=true"); - fetchSize = stmt.getFetchSize(); - assertEquals( - miniHS2.getHiveConf().getIntVar( - HiveConf.ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_MAX_FETCH_SIZE), - fetchSize); + stmt.execute("set"); + assertEquals(50, stmt.getFetchSize()); stmt.close(); fsConn.close(); } diff --git a/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java b/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java index cbf6632..7f0d8dc 100644 --- a/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java +++ b/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java @@ -28,6 +28,7 @@ import org.apache.hive.service.rpc.thrift.TSetClientInfoResp; import org.apache.hive.service.rpc.thrift.TSetClientInfoReq; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.hive.common.auth.HiveAuthUtils; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hive.jdbc.Utils.JdbcConnectionParams; import org.apache.hive.service.auth.HiveAuthConstants; import org.apache.hive.service.auth.KerberosSaslHelper; @@ -70,6 +71,7 @@ import org.apache.thrift.transport.TTransport; import org.apache.thrift.transport.TTransportException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import javax.net.ssl.KeyManagerFactory; import javax.net.ssl.SSLContext; import javax.net.ssl.TrustManagerFactory; @@ -143,7 +145,8 @@ public class HiveConnection implements java.sql.Connection { private final List<TProtocolVersion> supportedProtocols = new LinkedList<TProtocolVersion>(); private int loginTimeout = 0; private TProtocolVersion protocol; - private int fetchSize = HiveStatement.DEFAULT_FETCH_SIZE; + private final int initFetchSize; + private int defaultFetchSize; private String initFile = null; private String wmPool = null, wmApp = null; private Properties clientInfo; @@ -261,9 +264,8 @@ public class HiveConnection implements java.sql.Connection { port = connParams.getPort(); isEmbeddedMode = connParams.isEmbeddedMode(); - if (sessConfMap.containsKey(JdbcConnectionParams.FETCH_SIZE)) { - fetchSize = Integer.parseInt(sessConfMap.get(JdbcConnectionParams.FETCH_SIZE)); - } + initFetchSize = Integer.parseInt(sessConfMap.getOrDefault(JdbcConnectionParams.FETCH_SIZE, "0")); + if (sessConfMap.containsKey(JdbcConnectionParams.INIT_FILE)) { initFile = sessConfMap.get(JdbcConnectionParams.INIT_FILE); } @@ -832,9 +834,6 @@ public class HiveConnection implements java.sql.Connection { } // switch the database openConf.put("use:database", connParams.getDbName()); - // set the fetchSize - openConf.put("set:hiveconf:hive.server2.thrift.resultset.default.fetch.size", - Integer.toString(fetchSize)); if (wmPool != null) { openConf.put("set:hivevar:wmpool", wmPool); } @@ -867,16 +866,19 @@ public class HiveConnection implements java.sql.Connection { protocol = openResp.getServerProtocolVersion(); sessHandle = openResp.getSessionHandle(); - // Update fetchSize if modified by server - String serverFetchSize = - openResp.getConfiguration().get("hive.server2.thrift.resultset.default.fetch.size"); - if (serverFetchSize != null) { - fetchSize = Integer.parseInt(serverFetchSize); + final String serverFetchSizeString = + openResp.getConfiguration().get(ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_DEFAULT_FETCH_SIZE.varname); + if (serverFetchSizeString == null) { + throw new IllegalStateException("Server returned a null default fetch size. Check that " + + ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_DEFAULT_FETCH_SIZE.varname + " is configured correctly."); + } + + this.defaultFetchSize = Integer.parseInt(serverFetchSizeString); + if (this.defaultFetchSize <= 0) { + throw new IllegalStateException("Default fetch size must be greater than 0"); } } catch (TException e) { - LOG.error("Error opening session", e); - throw new SQLException("Could not establish connection to " - + jdbcUriString + ": " + e.getMessage(), " 08S01", e); + throw new SQLException("Could not establish connection to " + jdbcUriString + ": " + e.getMessage(), " 08S01", e); } isClosed = false; } @@ -1107,7 +1109,7 @@ public class HiveConnection implements java.sql.Connection { if (isClosed) { throw new SQLException("Can't create Statement, connection is closed"); } - return new HiveStatement(this, client, sessHandle, fetchSize); + return new HiveStatement(this, client, sessHandle, false, initFetchSize, defaultFetchSize); } /* @@ -1127,8 +1129,8 @@ public class HiveConnection implements java.sql.Connection { throw new SQLException("Statement with resultset type " + resultSetType + " is not supported", "HYC00"); // Optional feature not implemented } - return new HiveStatement(this, client, sessHandle, - resultSetType == ResultSet.TYPE_SCROLL_INSENSITIVE, fetchSize); + return new HiveStatement(this, client, sessHandle, resultSetType == ResultSet.TYPE_SCROLL_INSENSITIVE, + initFetchSize, defaultFetchSize); } /* diff --git a/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java b/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java index 693203f..db965e7 100644 --- a/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java +++ b/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java @@ -20,6 +20,7 @@ package org.apache.hive.jdbc; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.hive.common.classification.InterfaceAudience.LimitedPrivate; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hive.jdbc.logs.InPlaceUpdateStream; import org.apache.hive.service.cli.RowSet; import org.apache.hive.service.cli.RowSetFactory; @@ -53,6 +54,7 @@ import java.util.Base64; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; /** * HiveStatement. @@ -60,13 +62,17 @@ import java.util.Map; */ public class HiveStatement implements java.sql.Statement { public static final Logger LOG = LoggerFactory.getLogger(HiveStatement.class.getName()); - public static final int DEFAULT_FETCH_SIZE = 1000; + + private static final int DEFAULT_FETCH_SIZE = + HiveConf.ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_DEFAULT_FETCH_SIZE.defaultIntVal; + private final HiveConnection connection; private TCLIService.Iface client; private TOperationHandle stmtHandle = null; private final TSessionHandle sessHandle; Map<String,String> sessConf = new HashMap<String,String>(); - private int fetchSize = DEFAULT_FETCH_SIZE; + private int fetchSize; + private final int defaultFetchSize; private boolean isScrollableResultset = false; private boolean isOperationComplete = false; /** @@ -122,26 +128,22 @@ public class HiveStatement implements java.sql.Statement { public HiveStatement(HiveConnection connection, TCLIService.Iface client, TSessionHandle sessHandle) { - this(connection, client, sessHandle, false, DEFAULT_FETCH_SIZE); + this(connection, client, sessHandle, false, 0, DEFAULT_FETCH_SIZE); } - public HiveStatement(HiveConnection connection, TCLIService.Iface client, - TSessionHandle sessHandle, int fetchSize) { - this(connection, client, sessHandle, false, fetchSize); - } + public HiveStatement(HiveConnection connection, TCLIService.Iface client, TSessionHandle sessHandle, + boolean isScrollableResultset, int initFetchSize, int defaultFetchSize) { + this.connection = Objects.requireNonNull(connection); + this.client = Objects.requireNonNull(client); + this.sessHandle = Objects.requireNonNull(sessHandle); - public HiveStatement(HiveConnection connection, TCLIService.Iface client, - TSessionHandle sessHandle, boolean isScrollableResultset) { - this(connection, client, sessHandle, isScrollableResultset, DEFAULT_FETCH_SIZE); - } + if (initFetchSize < 0 || defaultFetchSize <= 0) { + throw new IllegalArgumentException(); + } - public HiveStatement(HiveConnection connection, TCLIService.Iface client, - TSessionHandle sessHandle, boolean isScrollableResultset, int fetchSize) { - this.connection = connection; - this.client = client; - this.sessHandle = sessHandle; this.isScrollableResultset = isScrollableResultset; - this.fetchSize = fetchSize; + this.defaultFetchSize = defaultFetchSize; + this.fetchSize = (initFetchSize == 0) ? defaultFetchSize : initFetchSize; } /* @@ -811,12 +813,9 @@ public class HiveStatement implements java.sql.Statement { public void setFetchSize(int rows) throws SQLException { checkConnection("setFetchSize"); if (rows > 0) { - fetchSize = rows; + this.fetchSize = rows; } else if (rows == 0) { - // Javadoc for Statement interface states that if the value is zero - // then "fetch size" hint is ignored. - // In this case it means reverting it to the default value. - fetchSize = DEFAULT_FETCH_SIZE; + this.fetchSize = this.defaultFetchSize; } else { throw new SQLException("Fetch size must be greater or equal to 0"); } diff --git a/jdbc/src/test/org/apache/hive/jdbc/TestHiveStatement.java b/jdbc/src/test/org/apache/hive/jdbc/TestHiveStatement.java index eeb6b5d..62a1fbd 100644 --- a/jdbc/src/test/org/apache/hive/jdbc/TestHiveStatement.java +++ b/jdbc/src/test/org/apache/hive/jdbc/TestHiveStatement.java @@ -17,42 +17,127 @@ */ package org.apache.hive.jdbc; -import org.junit.Test; +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; import java.sql.SQLException; +import java.sql.SQLFeatureNotSupportedException; +import java.sql.Statement; -import static org.junit.Assert.assertEquals; +import org.apache.hive.service.rpc.thrift.TCLIService.Iface; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hive.service.rpc.thrift.TSessionHandle; +import org.junit.Test; public class TestHiveStatement { + /** + * Gives the JDBC driver a hint as to the number of rows that should be + * fetched from the database when more rows are needed for ResultSet objects + * generated by this Statement. If the value specified is zero, then the hint + * is ignored. Test a value greater than zero is accepted. + */ @Test - public void testSetFetchSize1() throws SQLException { - HiveStatement stmt = new HiveStatement(null, null, null); - stmt.setFetchSize(123); - assertEquals(123, stmt.getFetchSize()); + public void testSetFetchSize() throws SQLException { + final HiveConnection connection = mock(HiveConnection.class); + final Iface iface = mock(Iface.class); + final TSessionHandle handle = mock(TSessionHandle.class); + + try (HiveStatement stmt = new HiveStatement(connection, iface, handle)) { + stmt.setFetchSize(123); + assertEquals(123, stmt.getFetchSize()); + } } + /** + * Gives the JDBC driver a hint as to the number of rows that should be + * fetched from the database when more rows are needed for ResultSet objects + * generated by this Statement. If the value specified is zero, then the hint + * is ignored. Test for a value of zero (hint is ignored). + */ @Test - public void testSetFetchSize2() throws SQLException { - HiveStatement stmt = new HiveStatement(null, null, null); - int initial = stmt.getFetchSize(); - stmt.setFetchSize(0); - assertEquals(initial, stmt.getFetchSize()); + public void testSetFetchSizeZero() throws SQLException { + final HiveConnection connection = mock(HiveConnection.class); + final Iface iface = mock(Iface.class); + final TSessionHandle handle = mock(TSessionHandle.class); + + // No hint specified and no default value passed in through the constructor, + // so it falls-back to the configuration default value + try (HiveStatement stmt = new HiveStatement(connection, iface, handle)) { + stmt.setFetchSize(0); + assertEquals(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_DEFAULT_FETCH_SIZE.defaultIntVal, + stmt.getFetchSize()); + } } - @Test(expected = SQLException.class) - public void testSetFetchSize3() throws SQLException { - HiveStatement stmt = new HiveStatement(null, null, null); - stmt.setFetchSize(-1); + /** + * Gives the JDBC driver a hint as to the number of rows that should be + * fetched from the database when more rows are needed for ResultSet objects + * generated by this Statement. If the value specified is zero, then the hint + * is ignored. Test for a value of zero (hint is ignored) and a default is + * specified. + */ + @Test + public void testSetFetchSizeZeroWithDefault() throws SQLException { + final HiveConnection connection = mock(HiveConnection.class); + final Iface iface = mock(Iface.class); + final TSessionHandle handle = mock(TSessionHandle.class); + + // No hint specified and no default value passed in through the constructor, + // so it falls-back to a value 1000 + try (HiveStatement stmt = new HiveStatement(connection, iface, handle, false, 0, 10)) { + stmt.setFetchSize(0); + assertEquals(10, stmt.getFetchSize()); + } } + /** + * Gives the JDBC driver a hint as to the number of rows that should be + * fetched from the database when more rows are needed for ResultSet objects + * generated by this Statement. The fetch size can be passed in through the + * JDBC connection string whereby every created {@code Statement} will start + * with the fetch size specified if no explicit calls are made. + * + * @see Utils.JdbcConnectionParams#FETCH_SIZE + */ @Test + public void testSetFetchSizeJdbcProperty() throws SQLException { + final HiveConnection connection = mock(HiveConnection.class); + final Iface iface = mock(Iface.class); + final TSessionHandle handle = mock(TSessionHandle.class); + + try (HiveStatement stmt = new HiveStatement(connection, iface, handle, false, 4, 1000)) { + assertEquals(4, stmt.getFetchSize()); + } + } + + /** + * Gives the JDBC driver a hint as to the number of rows that should be + * fetched from the database when more rows are needed for ResultSet objects + * generated by this Statement. If the value specified is zero, then the hint + * is ignored. If the hint is less than zero, an error is thrown. + * + * @see Statement#setFetchSize(int) + */ + @Test(expected = SQLException.class) + public void testSetFetchSizeNegativeValue() throws SQLException { + final HiveConnection connection = mock(HiveConnection.class); + final Iface iface = mock(Iface.class); + final TSessionHandle handle = mock(TSessionHandle.class); + + try (HiveStatement stmt = new HiveStatement(connection, iface, handle)) { + stmt.setFetchSize(-1); + } + } + + @Test(expected = SQLFeatureNotSupportedException.class) public void testaddBatch() throws SQLException { - HiveStatement stmt = new HiveStatement(null, null, null); - try { + final HiveConnection connection = mock(HiveConnection.class); + final Iface iface = mock(Iface.class); + final TSessionHandle handle = mock(TSessionHandle.class); + + try (HiveStatement stmt = new HiveStatement(connection, iface, handle)) { stmt.addBatch(null); - } catch (SQLException e) { - assertEquals("java.sql.SQLFeatureNotSupportedException: Method not supported", e.toString()); } } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java index a734342..7024910 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -892,11 +892,7 @@ public class Driver implements IDriver { while (numRows < maxRows) { if (driverContext.getResStream() == null) { - if (numRows > 0) { - return true; - } else { - return false; - } + return (numRows > 0); } bos.reset(); diff --git a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java index 4a35cf0..96770f4 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java @@ -461,7 +461,7 @@ public class SQLOperation extends ExecuteStatementOperation { maxRows = 1; isBlobBased = true; } - driver.setMaxRows((int) maxRows); + driver.setMaxRows(Math.toIntExact(maxRows)); RowSet rowSet = RowSetFactory.create(getResultSetSchema(), getProtocolVersion(), isBlobBased); try { /* if client is requesting fetch-from-start and its not the first time reading from this operation @@ -471,7 +471,7 @@ public class SQLOperation extends ExecuteStatementOperation { driver.resetFetch(); } fetchStarted = true; - driver.setMaxRows((int) maxRows); + driver.setMaxRows(Math.toIntExact(maxRows)); if (driver.getResults(convey)) { return decode(convey, rowSet); } diff --git a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java index 475b1bb..9e49754 100644 --- a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java +++ b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java @@ -183,8 +183,6 @@ public class HiveSessionImpl implements HiveSession { // Process global init file: .hiverc processGlobalInitFile(); - // Set fetch size in session conf map - sessionConfMap = setFetchSize(sessionConfMap); if (sessionConfMap != null) { configureSession(sessionConfMap); @@ -275,22 +273,6 @@ public class HiveSessionImpl implements HiveSession { } } - private Map<String, String> setFetchSize(Map<String, String> sessionConfMap) { - int maxFetchSize = - sessionConf.getIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_MAX_FETCH_SIZE); - String confFetchSize = sessionConfMap != null ? - sessionConfMap.get( - "set:hiveconf:" + HiveConf.ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_DEFAULT_FETCH_SIZE.varname) : - null; - if (confFetchSize != null && !confFetchSize.isEmpty()) { - int fetchSize = Integer.parseInt(confFetchSize); - sessionConfMap.put( - "set:hiveconf:" + HiveConf.ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_DEFAULT_FETCH_SIZE.varname, - Integer.toString(fetchSize > maxFetchSize ? maxFetchSize : fetchSize)); - } - return sessionConfMap; - } - private void configureSession(Map<String, String> sessionConfMap) throws HiveSQLException { SessionState.setCurrentSessionState(sessionState); for (Map.Entry<String, String> entry : sessionConfMap.entrySet()) { diff --git a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java index 8f13fb3..a7fe049 100644 --- a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java +++ b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java @@ -791,13 +791,15 @@ public abstract class ThriftCLIService extends AbstractService implements TCLISe @Override public TFetchResultsResp FetchResults(TFetchResultsReq req) throws TException { TFetchResultsResp resp = new TFetchResultsResp(); + + final int maxFetchSize = hiveConf.getIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_MAX_FETCH_SIZE); + if (req.getMaxRows() > maxFetchSize) { + LOG.warn("Fetch Size greater than maximum allowed. Capping fetch size. [req={}, max={}]", req.getMaxRows(), + maxFetchSize); + req.setMaxRows(maxFetchSize); + } + try { - // Set fetch size - int maxFetchSize = - hiveConf.getIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_MAX_FETCH_SIZE); - if (req.getMaxRows() > maxFetchSize) { - req.setMaxRows(maxFetchSize); - } RowSet rowSet = cliService.fetchResults( new OperationHandle(req.getOperationHandle()), FetchOrientation.getFetchOrientation(req.getOrientation()),