This is an automated email from the ASF dual-hosted git repository. lixiao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new adb506a [SPARK-28852][SQL] Implement SparkGetCatalogsOperation for Thrift Server adb506a is described below commit adb506afd783d24d397c73c34c7fed89563c0a6b Author: Yuming Wang <yumw...@ebay.com> AuthorDate: Sun Aug 25 22:42:50 2019 -0700 [SPARK-28852][SQL] Implement SparkGetCatalogsOperation for Thrift Server ### What changes were proposed in this pull request? This PR implements `SparkGetCatalogsOperation` for Thrift Server metadata completeness. ### Why are the changes needed? Thrift Server metadata completeness. ### Does this PR introduce any user-facing change? No ### How was this patch tested? Unit test Closes #25555 from wangyum/SPARK-28852. Authored-by: Yuming Wang <yumw...@ebay.com> Signed-off-by: Xiao Li <gatorsm...@gmail.com> --- .../thriftserver/SparkGetCatalogsOperation.scala | 79 ++++++++++++++++++++++ .../server/SparkSQLOperationManager.scala | 11 +++ .../thriftserver/SparkMetadataOperationSuite.scala | 8 +++ .../cli/operation/GetCatalogsOperation.java | 2 +- .../cli/operation/GetCatalogsOperation.java | 2 +- 5 files changed, 100 insertions(+), 2 deletions(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetCatalogsOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetCatalogsOperation.scala new file mode 100644 index 0000000..cde99fd --- /dev/null +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetCatalogsOperation.scala @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.thriftserver + +import java.util.UUID + +import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveOperationType +import org.apache.hive.service.cli.{HiveSQLException, OperationState} +import org.apache.hive.service.cli.operation.GetCatalogsOperation +import org.apache.hive.service.cli.session.HiveSession + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.SQLContext +import org.apache.spark.util.{Utils => SparkUtils} + +/** + * Spark's own GetCatalogsOperation + * + * @param sqlContext SQLContext to use + * @param parentSession a HiveSession from SessionManager + */ +private[hive] class SparkGetCatalogsOperation( + sqlContext: SQLContext, + parentSession: HiveSession) + extends GetCatalogsOperation(parentSession) with Logging { + + private var statementId: String = _ + + override def close(): Unit = { + super.close() + HiveThriftServer2.listener.onOperationClosed(statementId) + } + + override def runInternal(): Unit = { + statementId = UUID.randomUUID().toString + val logMsg = "Listing catalogs" + logInfo(s"$logMsg with $statementId") + setState(OperationState.RUNNING) + // Always use the latest class loader provided by executionHive's state. + val executionHiveClassLoader = sqlContext.sharedState.jarClassLoader + Thread.currentThread().setContextClassLoader(executionHiveClassLoader) + + HiveThriftServer2.listener.onStatementStart( + statementId, + parentSession.getSessionHandle.getSessionId.toString, + logMsg, + statementId, + parentSession.getUsername) + + try { + if (isAuthV2Enabled) { + authorizeMetaGets(HiveOperationType.GET_CATALOGS, null) + } + setState(OperationState.FINISHED) + } catch { + case e: HiveSQLException => + setState(OperationState.ERROR) + HiveThriftServer2.listener.onStatementError( + statementId, e.getMessage, SparkUtils.exceptionString(e)) + throw e + } + HiveThriftServer2.listener.onStatementFinish(statementId) + } +} diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala index dfcd333..35f9254 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala @@ -63,6 +63,17 @@ private[thriftserver] class SparkSQLOperationManager() operation } + override def newGetCatalogsOperation( + parentSession: HiveSession): GetCatalogsOperation = synchronized { + val sqlContext = sessionToContexts.get(parentSession.getSessionHandle) + require(sqlContext != null, s"Session handle: ${parentSession.getSessionHandle} has not been" + + " initialized or had already closed.") + val operation = new SparkGetCatalogsOperation(sqlContext, parentSession) + handleToOperation.put(operation.getHandle, operation) + logDebug(s"Created GetCatalogsOperation with session=$parentSession.") + operation + } + override def newGetSchemasOperation( parentSession: HiveSession, catalogName: String, diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkMetadataOperationSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkMetadataOperationSuite.scala index 45fe8a8..21870ff 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkMetadataOperationSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkMetadataOperationSuite.scala @@ -223,4 +223,12 @@ class SparkMetadataOperationSuite extends HiveThriftJdbcTest { assert(!rs.next()) } } + + test("Spark's own GetCatalogsOperation(SparkGetCatalogsOperation)") { + withJdbcStatement() { statement => + val metaData = statement.getConnection.getMetaData + val rs = metaData.getCatalogs + assert(!rs.next()) + } + } } diff --git a/sql/hive-thriftserver/v1.2.1/src/main/java/org/apache/hive/service/cli/operation/GetCatalogsOperation.java b/sql/hive-thriftserver/v1.2.1/src/main/java/org/apache/hive/service/cli/operation/GetCatalogsOperation.java index 8868ec1..581d975 100644 --- a/sql/hive-thriftserver/v1.2.1/src/main/java/org/apache/hive/service/cli/operation/GetCatalogsOperation.java +++ b/sql/hive-thriftserver/v1.2.1/src/main/java/org/apache/hive/service/cli/operation/GetCatalogsOperation.java @@ -36,7 +36,7 @@ public class GetCatalogsOperation extends MetadataOperation { private static final TableSchema RESULT_SET_SCHEMA = new TableSchema() .addStringColumn("TABLE_CAT", "Catalog name. NULL if not applicable."); - private final RowSet rowSet; + protected final RowSet rowSet; protected GetCatalogsOperation(HiveSession parentSession) { super(parentSession, OperationType.GET_CATALOGS); diff --git a/sql/hive-thriftserver/v2.3.5/src/main/java/org/apache/hive/service/cli/operation/GetCatalogsOperation.java b/sql/hive-thriftserver/v2.3.5/src/main/java/org/apache/hive/service/cli/operation/GetCatalogsOperation.java index b85d5c0..4d15022 100644 --- a/sql/hive-thriftserver/v2.3.5/src/main/java/org/apache/hive/service/cli/operation/GetCatalogsOperation.java +++ b/sql/hive-thriftserver/v2.3.5/src/main/java/org/apache/hive/service/cli/operation/GetCatalogsOperation.java @@ -36,7 +36,7 @@ public class GetCatalogsOperation extends MetadataOperation { private static final TableSchema RESULT_SET_SCHEMA = new TableSchema() .addStringColumn("TABLE_CAT", "Catalog name. NULL if not applicable."); - private final RowSet rowSet; + protected final RowSet rowSet; protected GetCatalogsOperation(HiveSession parentSession) { super(parentSession, OperationType.GET_CATALOGS); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org