[GitHub] [spark] yaooqinn commented on a diff in pull request #36373: [SPARK-39041][SQL] Mapping Spark Query ResultSet/Schema to TRowSet/TTableSchema directly
yaooqinn commented on code in PR #36373: URL: https://github.com/apache/spark/pull/36373#discussion_r871502997 ## sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/HiveSession.java: ## @@ -182,11 +184,11 @@ OperationHandle getCrossReference(String primaryCatalog, void closeOperation(OperationHandle opHandle) throws HiveSQLException; - TableSchema getResultSetMetadata(OperationHandle opHandle) + TTableSchema getResultSetMetadata(OperationHandle opHandle) throws HiveSQLException; - RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation, - long maxRows, FetchType fetchType) throws HiveSQLException; + TRowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation, + long maxRows, FetchType fetchType) throws HiveSQLException; Review Comment: ```suggestion long maxRows, FetchType fetchType) throws HiveSQLException; ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] yaooqinn commented on a diff in pull request #36373: [SPARK-39041][SQL] Mapping Spark Query ResultSet/Schema to TRowSet/TTableSchema directly
yaooqinn commented on code in PR #36373: URL: https://github.com/apache/spark/pull/36373#discussion_r871496811 ## sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala: ## @@ -373,17 +306,69 @@ private[hive] class SparkExecuteStatementOperation( } object SparkExecuteStatementOperation { - def getTableSchema(structType: StructType): TableSchema = { -val schema = structType.map { field => - val attrTypeString = field.dataType match { -case CalendarIntervalType => StringType.catalogString -case _: YearMonthIntervalType => "interval_year_month" -case _: DayTimeIntervalType => "interval_day_time" -case _: TimestampNTZType => "timestamp" -case other => other.catalogString - } - new FieldSchema(field.name, attrTypeString, field.getComment.getOrElse("")) + + def toTTypeId(typ: DataType): TTypeId = typ match { +case NullType => TTypeId.NULL_TYPE +case BooleanType => TTypeId.BOOLEAN_TYPE +case ByteType => TTypeId.TINYINT_TYPE +case ShortType => TTypeId.SMALLINT_TYPE +case IntegerType => TTypeId.INT_TYPE +case LongType => TTypeId.BIGINT_TYPE +case FloatType => TTypeId.FLOAT_TYPE +case DoubleType => TTypeId.DOUBLE_TYPE +case StringType => TTypeId.STRING_TYPE +case _: DecimalType => TTypeId.DECIMAL_TYPE +case DateType => TTypeId.DATE_TYPE +// TODO: Shall use TIMESTAMPLOCALTZ_TYPE, keep AS-IS now for +// unnecessary behavior change +case TimestampType => TTypeId.TIMESTAMP_TYPE +case TimestampNTZType => TTypeId.TIMESTAMP_TYPE Review Comment: this is for rowset schema. jdbc client side can call getString to get the raw string or getObject to get an java object where it is used -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] yaooqinn commented on a diff in pull request #36373: [SPARK-39041][SQL] Mapping Spark Query ResultSet/Schema to TRowSet/TTableSchema directly
yaooqinn commented on code in PR #36373: URL: https://github.com/apache/spark/pull/36373#discussion_r861445508 ## sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/RowSetUtils.scala: ## @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.thriftserver + +import java.nio.ByteBuffer + +import scala.collection.JavaConverters._ +import scala.language.implicitConversions + +import org.apache.hive.service.rpc.thrift._ + +import org.apache.spark.sql.Row +import org.apache.spark.sql.execution.HiveResult.{toHiveString, TimeFormatters} +import org.apache.spark.sql.types.{BinaryType, BooleanType, ByteType, DataType, DoubleType, FloatType, IntegerType, LongType, ShortType, StringType} + +object RowSetUtils { + + implicit def bitSetToBuffer(bitSet: java.util.BitSet): ByteBuffer = { +ByteBuffer.wrap(bitSet.toByteArray) + } + + def toTRowSet( + startRowOffSet: Long, + rows: Seq[Row], + schema: Array[DataType], + protocolVersion: TProtocolVersion, + timeFormatters: TimeFormatters): TRowSet = { +if (protocolVersion.getValue < TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V6.getValue) { + toRowBasedSet(startRowOffSet, rows, schema, timeFormatters) +} else { + toColumnBasedSet(startRowOffSet, rows, schema, timeFormatters) +} + } + + private def toRowBasedSet( + startRowOffSet: Long, + rows: Seq[Row], + schema: Array[DataType], + timeFormatters: TimeFormatters): TRowSet = { +var i = 0 +val rowSize = rows.length +val tRows = new java.util.ArrayList[TRow](rowSize) +while (i < rowSize) { + val row = rows(i) + val tRow = new TRow() + var j = 0 + val columnSize = row.length + while (j < columnSize) { +val columnValue = toTColumnValue(j, row, schema(j), timeFormatters) +tRow.addToColVals(columnValue) +j += 1 + } + i += 1 + tRows.add(tRow) +} +new TRowSet(startRowOffSet, tRows) + } + + private def toColumnBasedSet( + startRowOffSet: Long, + rows: Seq[Row], + schema: Array[DataType], + timeFormatters: TimeFormatters): TRowSet = { +val rowSize = rows.length +val tRowSet = new TRowSet(startRowOffSet, new java.util.ArrayList[TRow](rowSize)) +var i = 0 +val columnSize = schema.length +while (i < columnSize) { + val tColumn = toTColumn(rows, i, schema(i), timeFormatters) + tRowSet.addToColumns(tColumn) + i += 1 +} +tRowSet + } + + private def toTColumn( + rows: Seq[Row], ordinal: Int, typ: DataType, timeFormatters: TimeFormatters): TColumn = { +val nulls = new java.util.BitSet() +typ match { + case BooleanType => +val values = getOrSetAsNull[java.lang.Boolean](rows, ordinal, nulls, true) +TColumn.boolVal(new TBoolColumn(values, nulls)) + + case ByteType => +val values = getOrSetAsNull[java.lang.Byte](rows, ordinal, nulls, 0.toByte) +TColumn.byteVal(new TByteColumn(values, nulls)) + + case ShortType => +val values = getOrSetAsNull[java.lang.Short](rows, ordinal, nulls, 0.toShort) +TColumn.i16Val(new TI16Column(values, nulls)) + + case IntegerType => +val values = getOrSetAsNull[java.lang.Integer](rows, ordinal, nulls, 0) +TColumn.i32Val(new TI32Column(values, nulls)) + + case LongType => +val values = getOrSetAsNull[java.lang.Long](rows, ordinal, nulls, 0L) +TColumn.i64Val(new TI64Column(values, nulls)) + + case FloatType => +val values = getOrSetAsNull[java.lang.Float](rows, ordinal, nulls, 0.toFloat) + .asScala.map(n => java.lang.Double.valueOf(n.toString)).asJava +TColumn.doubleVal(new TDoubleColumn(values, nulls)) + + case DoubleType => +val values = getOrSetAsNull[java.lang.Double](rows, ordinal, nulls, 0.toDouble) +TColumn.doubleVal(new TDoubleColumn(values, nulls)) + + case StringType => +val values = getOrSetAsNull[java.lang.String](rows, ordinal, nulls, "") +TColumn.stringVal(new TStringColumn(values, nulls)) + + case BinaryType => +val values =
[GitHub] [spark] yaooqinn commented on a diff in pull request #36373: [SPARK-39041][SQL] Mapping Spark Query ResultSet/Schema to TRowSet/TTableSchema directly
yaooqinn commented on code in PR #36373: URL: https://github.com/apache/spark/pull/36373#discussion_r860912047 ## sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/RowSetUtils.scala: ## @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.thriftserver + +import java.nio.ByteBuffer + +import scala.collection.JavaConverters._ +import scala.language.implicitConversions + +import org.apache.hive.service.rpc.thrift._ + +import org.apache.spark.sql.Row +import org.apache.spark.sql.execution.HiveResult.{toHiveString, TimeFormatters} +import org.apache.spark.sql.types.{BinaryType, BooleanType, ByteType, DataType, DoubleType, FloatType, IntegerType, LongType, ShortType, StringType} + +object RowSetUtils { + + implicit def bitSetToBuffer(bitSet: java.util.BitSet): ByteBuffer = { +ByteBuffer.wrap(bitSet.toByteArray) + } + + def toTRowSet( + startRowOffSet: Long, + rows: Seq[Row], + schema: Array[DataType], + protocolVersion: TProtocolVersion, + timeFormatters: TimeFormatters): TRowSet = { +if (protocolVersion.getValue < TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V6.getValue) { + toRowBasedSet(startRowOffSet, rows, schema, timeFormatters) +} else { + toColumnBasedSet(startRowOffSet, rows, schema, timeFormatters) +} + } + + private def toRowBasedSet( + startRowOffSet: Long, + rows: Seq[Row], + schema: Array[DataType], + timeFormatters: TimeFormatters): TRowSet = { +var i = 0 +val rowSize = rows.length +val tRows = new java.util.ArrayList[TRow](rowSize) +while (i < rowSize) { + val row = rows(i) + val tRow = new TRow() + var j = 0 + val columnSize = row.length + while (j < columnSize) { +val columnValue = toTColumnValue(j, row, schema(j), timeFormatters) +tRow.addToColVals(columnValue) +j += 1 + } + i += 1 + tRows.add(tRow) +} +new TRowSet(startRowOffSet, tRows) + } + + private def toColumnBasedSet( + startRowOffSet: Long, + rows: Seq[Row], + schema: Array[DataType], + timeFormatters: TimeFormatters): TRowSet = { +val rowSize = rows.length +val tRowSet = new TRowSet(startRowOffSet, new java.util.ArrayList[TRow](rowSize)) +var i = 0 +val columnSize = schema.length +while (i < columnSize) { + val tColumn = toTColumn(rows, i, schema(i), timeFormatters) + tRowSet.addToColumns(tColumn) + i += 1 +} +tRowSet + } + + private def toTColumn( + rows: Seq[Row], ordinal: Int, typ: DataType, timeFormatters: TimeFormatters): TColumn = { +val nulls = new java.util.BitSet() +typ match { + case BooleanType => +val values = getOrSetAsNull[java.lang.Boolean](rows, ordinal, nulls, true) +TColumn.boolVal(new TBoolColumn(values, nulls)) + + case ByteType => +val values = getOrSetAsNull[java.lang.Byte](rows, ordinal, nulls, 0.toByte) +TColumn.byteVal(new TByteColumn(values, nulls)) + + case ShortType => +val values = getOrSetAsNull[java.lang.Short](rows, ordinal, nulls, 0.toShort) +TColumn.i16Val(new TI16Column(values, nulls)) + + case IntegerType => +val values = getOrSetAsNull[java.lang.Integer](rows, ordinal, nulls, 0) +TColumn.i32Val(new TI32Column(values, nulls)) + + case LongType => +val values = getOrSetAsNull[java.lang.Long](rows, ordinal, nulls, 0L) +TColumn.i64Val(new TI64Column(values, nulls)) + + case FloatType => +val values = getOrSetAsNull[java.lang.Float](rows, ordinal, nulls, 0.toFloat) + .asScala.map(n => java.lang.Double.valueOf(n.toString)).asJava +TColumn.doubleVal(new TDoubleColumn(values, nulls)) + + case DoubleType => +val values = getOrSetAsNull[java.lang.Double](rows, ordinal, nulls, 0.toDouble) +TColumn.doubleVal(new TDoubleColumn(values, nulls)) + + case StringType => +val values = getOrSetAsNull[java.lang.String](rows, ordinal, nulls, "") +TColumn.stringVal(new TStringColumn(values, nulls)) + + case BinaryType => +val values =
[GitHub] [spark] yaooqinn commented on a diff in pull request #36373: [SPARK-39041][SQL] Mapping Spark Query ResultSet/Schema to TRowSet/TTableSchema directly
yaooqinn commented on code in PR #36373: URL: https://github.com/apache/spark/pull/36373#discussion_r859583552 ## sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/CLIServiceClient.java: ## @@ -35,7 +36,7 @@ public SessionHandle openSession(String username, String password) } @Override - public RowSet fetchResults(OperationHandle opHandle) throws HiveSQLException { + public TRowSet fetchResults(OperationHandle opHandle) throws HiveSQLException { Review Comment: CLIServiceClient and its child is used in our test only, changes here will not have impacts on real-world clients -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] yaooqinn commented on a diff in pull request #36373: [SPARK-39041][SQL] Mapping Spark Query ResultSet/Schema to TRowSet/TTableSchema directly
yaooqinn commented on code in PR #36373: URL: https://github.com/apache/spark/pull/36373#discussion_r859578656 ## sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala: ## @@ -373,17 +306,69 @@ private[hive] class SparkExecuteStatementOperation( } object SparkExecuteStatementOperation { - def getTableSchema(structType: StructType): TableSchema = { -val schema = structType.map { field => - val attrTypeString = field.dataType match { -case CalendarIntervalType => StringType.catalogString -case _: YearMonthIntervalType => "interval_year_month" -case _: DayTimeIntervalType => "interval_day_time" -case _: TimestampNTZType => "timestamp" -case other => other.catalogString - } - new FieldSchema(field.name, attrTypeString, field.getComment.getOrElse("")) + + def toTTypeId(typ: DataType): TTypeId = typ match { +case NullType => TTypeId.NULL_TYPE +case BooleanType => TTypeId.BOOLEAN_TYPE +case ByteType => TTypeId.TINYINT_TYPE +case ShortType => TTypeId.SMALLINT_TYPE +case IntegerType => TTypeId.INT_TYPE +case LongType => TTypeId.BIGINT_TYPE +case FloatType => TTypeId.FLOAT_TYPE +case DoubleType => TTypeId.DOUBLE_TYPE +case StringType => TTypeId.STRING_TYPE +case _: DecimalType => TTypeId.DECIMAL_TYPE +case DateType => TTypeId.DATE_TYPE +// TODO: Shall use TIMESTAMPLOCALTZ_TYPE, keep AS-IS now for Review Comment: TimestampType keeps being converted to TTypeId.TIMESTAMP_TYPE in this PR -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] yaooqinn commented on a diff in pull request #36373: [SPARK-39041][SQL] Mapping Spark Query ResultSet/Schema to TRowSet/TTableSchema directly
yaooqinn commented on code in PR #36373: URL: https://github.com/apache/spark/pull/36373#discussion_r859577346 ## sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala: ## @@ -373,17 +306,69 @@ private[hive] class SparkExecuteStatementOperation( } object SparkExecuteStatementOperation { - def getTableSchema(structType: StructType): TableSchema = { -val schema = structType.map { field => - val attrTypeString = field.dataType match { -case CalendarIntervalType => StringType.catalogString -case _: YearMonthIntervalType => "interval_year_month" -case _: DayTimeIntervalType => "interval_day_time" -case _: TimestampNTZType => "timestamp" -case other => other.catalogString - } - new FieldSchema(field.name, attrTypeString, field.getComment.getOrElse("")) + + def toTTypeId(typ: DataType): TTypeId = typ match { +case NullType => TTypeId.NULL_TYPE +case BooleanType => TTypeId.BOOLEAN_TYPE +case ByteType => TTypeId.TINYINT_TYPE +case ShortType => TTypeId.SMALLINT_TYPE +case IntegerType => TTypeId.INT_TYPE +case LongType => TTypeId.BIGINT_TYPE +case FloatType => TTypeId.FLOAT_TYPE +case DoubleType => TTypeId.DOUBLE_TYPE +case StringType => TTypeId.STRING_TYPE +case _: DecimalType => TTypeId.DECIMAL_TYPE +case DateType => TTypeId.DATE_TYPE +// TODO: Shall use TIMESTAMPLOCALTZ_TYPE, keep AS-IS now for +// unnecessary behavior change +case TimestampType => TTypeId.TIMESTAMP_TYPE +case TimestampNTZType => TTypeId.TIMESTAMP_TYPE +case BinaryType => TTypeId.BINARY_TYPE +case CalendarIntervalType => TTypeId.STRING_TYPE +case _: DayTimeIntervalType => TTypeId.INTERVAL_DAY_TIME_TYPE +case _: YearMonthIntervalType => TTypeId.INTERVAL_YEAR_MONTH_TYPE +case _: ArrayType => TTypeId.ARRAY_TYPE +case _: MapType => TTypeId.MAP_TYPE +case _: StructType => TTypeId.STRUCT_TYPE +case other => + throw new IllegalArgumentException(s"Unrecognized type name: ${other.catalogString}") + } + + private def toTTypeQualifiers(typ: DataType): TTypeQualifiers = { +val ret = new TTypeQualifiers() +val qualifiers = typ match { + case d: DecimalType => +Map( + TCLIServiceConstants.PRECISION -> TTypeQualifierValue.i32Value(d.precision), + TCLIServiceConstants.SCALE -> TTypeQualifierValue.i32Value(d.scale)).asJava + case _ => Collections.emptyMap[String, TTypeQualifierValue]() Review Comment: if char/varchar can be seen via resultset metadata, we shall also provide the char length here for client to take advantage of it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org