[GitHub] spark issue #22827: [SPARK-25832][SQL][BRANCH-2.4] Revert newly added map re...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22827 thanks, merging to 2.4! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22029: [SPARK-24395][SQL] IN operator should return NULL...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22029#discussion_r228245697 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala --- @@ -202,7 +209,11 @@ case class InSubquery(values: Seq[Expression], query: ListQuery) */ // scalastyle:off line.size.limit @ExpressionDescription( - usage = "expr1 _FUNC_(expr2, expr3, ...) - Returns true if `expr` equals to any valN.", + usage = """ +expr1 _FUNC_(expr2, expr3, ...) - Returns true if `expr` equals to any valN. Otherwise, if + spark.sql.legacy.inOperator.falseForNullField is false and any of the elements or fields of + the elements is null it returns null, else it returns false. --- End diff -- I vaguely remember that multi-line string doesn't work with `ExpressionDescription`. Can you verify it with `DESCRIBE FUNCTION`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22814: [SPARK-25819][SQL] Support parse mode option for the fun...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22814 LGTM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22812: [SPARK-25817][SQL] Dataset encoder should support combin...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22812 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22809: [SPARK-19851][SQL] Add support for EVERY and ANY ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22809#discussion_r228238508 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Max.scala --- @@ -57,3 +57,34 @@ case class Max(child: Expression) extends DeclarativeAggregate { override lazy val evaluateExpression: AttributeReference = max } + +abstract class AnyAggBase(arg: Expression) --- End diff -- If it's hard to decide where to put it, I think putting it in a new file can be considered. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22809: [SPARK-19851][SQL] Add support for EVERY and ANY ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22809#discussion_r228238276 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Max.scala --- @@ -57,3 +57,34 @@ case class Max(child: Expression) extends DeclarativeAggregate { override lazy val evaluateExpression: AttributeReference = max } + +abstract class AnyAggBase(arg: Expression) + extends UnevaluableAggrgate with ImplicitCastInputTypes { + + override def children: Seq[Expression] = arg :: Nil + + override def dataType: DataType = BooleanType + + override def inputTypes: Seq[AbstractDataType] = Seq(BooleanType) + + override def checkInputDataTypes(): TypeCheckResult = { +arg.dataType match { + case dt if dt != BooleanType => +TypeCheckResult.TypeCheckFailure(s"Input to function '$prettyName' should have been " + + s"${BooleanType.simpleString}, but it's [${arg.dataType.catalogString}].") + case _ => TypeCheckResult.TypeCheckSuccess +} + } +} + +@ExpressionDescription( + usage = "_FUNC_(expr) - Returns true if at least one value of `expr` is true.") --- End diff -- Ideally we need `Since` here. Some functions don't have them because at that time the `Since` method was not there. We should add missing `Since` to them as well, if other people have time to do it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22675: [SPARK-25347][ML][DOC] Spark datasource for image/libsvm...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22675 thanks, merging to master/2.4! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22790: [SPARK-25793][ML]call SaveLoadV2_0.load for classNameV2_...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22790 Is this ready to go? We are going to have another RC, and would be good to include it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22775: [SPARK-24709][SQL][FOLLOW-UP] Make schema_of_json's inpu...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22775 I think I'm not qualified to make the decision here, as I don't fully understand the use case. It looks to me that one use case would be to run `schema_of_json` on a column and manually figure out what's the final schema, and then use this schema in `from_json`. If `schema_of_string` only accepts literal, I'm not how users would use it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22825: [SPARK-25772][SQL][FOLLOWUP] remove GetArrayFromMap
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22825 cc @vofque @viirya --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22825: [SPARK-25772][SQL][FOLLOWUP] remove GetArrayFromM...
GitHub user cloud-fan opened a pull request: https://github.com/apache/spark/pull/22825 [SPARK-25772][SQL][FOLLOWUP] remove GetArrayFromMap ## What changes were proposed in this pull request? In https://github.com/apache/spark/pull/22745 we introduced the `GetArrayFromMap` expression. Later on I realized this is duplicated as we already have `MapKeys` and `MapValues`. This PR removes `GetArrayFromMap` ## How was this patch tested? existing tests You can merge this pull request into a Git repository by running: $ git pull https://github.com/cloud-fan/spark minor Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22825.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22825 commit a6c6faa60d8846fb50845839905fc0b938046e02 Author: Wenchen Fan Date: 2018-10-25T14:29:50Z remove GetArrayFromMap --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22812: [SPARK-25817][SQL] Dataset encoder should support...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22812#discussion_r228195016 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala --- @@ -119,10 +119,9 @@ object ExpressionEncoder { } val childrenDeserializers = encoders.zipWithIndex.map { case (enc, index) => - val getColumnsByOrdinals = enc.objDeserializer.collect { case c: GetColumnByOrdinal => c } -.distinct - assert(getColumnsByOrdinals.size == 1, "object deserializer should have only one " + -s"`GetColumnByOrdinal`, but there are ${getColumnsByOrdinals.size}") + val getColExprs = enc.objDeserializer.collect { case c: GetColumnByOrdinal => c }.distinct --- End diff -- unrelated but to fix minor code style issues in https://github.com/apache/spark/pull/22749 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22821: [SPARK-25832][SQL] remove newly added map related functi...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22821 @dongjoon-hyun if there are advanced users who know all the background, and still want to use these functions, why shall we stop them? If end users can't hit the bug with public APIs, I think we are fine. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22814: [SPARK-25819][SQL] Support parse mode option for ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22814#discussion_r228164819 --- Diff: external/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala --- @@ -167,4 +184,26 @@ class AvroCatalystDataConversionSuite extends SparkFunSuite with ExpressionEvalH // avro reader reads the first 4 bytes of a double as a float, the result is totally undefined. checkResult(data, avroTypeJson, 5.848603E35f) } + + test("Handle unsupported input of record type") { +val actualSchema = StructType(Seq( + StructField("col_0", StringType, false), + StructField("col_1", ShortType, false), + StructField("col_2", DecimalType(8, 4), false), + StructField("col_3", BooleanType, true), + StructField("col_4", DecimalType(38, 38), false))) + +val expectedSchema = StructType(Seq( + StructField("col_0", BinaryType, false), + StructField("col_1", DoubleType, false), + StructField("col_2", DecimalType(18, 4), false), + StructField("col_3", StringType, true), + StructField("col_4", DecimalType(38, 38), false))) + +val data = RandomDataGenerator.randomRow(new scala.util.Random, actualSchema) --- End diff -- let's include the seed with `withClue`, so that people can reproduce test failures --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22814: [SPARK-25819][SQL] Support parse mode option for ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22814#discussion_r228164087 --- Diff: docs/sql-migration-guide-upgrade.md --- @@ -11,6 +11,10 @@ displayTitle: Spark SQL Upgrading Guide - In PySpark, when creating a `SparkSession` with `SparkSession.builder.getOrCreate()`, if there is an existing `SparkContext`, the builder was trying to update the `SparkConf` of the existing `SparkContext` with configurations specified to the builder, but the `SparkContext` is shared by all `SparkSession`s, so we should not update them. Since 3.0, the builder comes to not update the configurations. This is the same behavior as Java/Scala API in 2.3 and above. If you want to update them, you need to update them prior to creating a `SparkSession`. + - In Avro data source, the function `from_avro` supports following parse modes: +* `PERMISSIVE`: Corrupt records are processed as null result. To implement this, the data schema is forced to be fully nullable, which might be different from the one user provided. This is the default mode. +* `FAILFAST`: Throws an exception on processing corrupted record. --- End diff -- Let's explain what changes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22814: [SPARK-25819][SQL] Support parse mode option for ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22814#discussion_r228162464 --- Diff: docs/sql-migration-guide-upgrade.md --- @@ -11,6 +11,10 @@ displayTitle: Spark SQL Upgrading Guide - In PySpark, when creating a `SparkSession` with `SparkSession.builder.getOrCreate()`, if there is an existing `SparkContext`, the builder was trying to update the `SparkConf` of the existing `SparkContext` with configurations specified to the builder, but the `SparkContext` is shared by all `SparkSession`s, so we should not update them. Since 3.0, the builder comes to not update the configurations. This is the same behavior as Java/Scala API in 2.3 and above. If you want to update them, you need to update them prior to creating a `SparkSession`. + - In Avro data source, the function `from_avro` supports following parse modes: +* `PERMISSIVE`: Corrupt records are processed as null result. To implement this, the data schema is forced to be fully nullable, which might be different from the one user provided. This is the default mode. +* `FAILFAST`: Throws an exception on processing corrupted record. --- End diff -- We don't change existing APIs but add a new `from_avro` method to take an extra `option` parameter. Users won't hit any problem when upgrading Spark, and ideally they should read release notes and use this new feature if they need. I don't think we need to put it in migration guide. Let's not abuse the migration guide. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22814: [SPARK-25819][SQL] Support parse mode option for ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22814#discussion_r228160490 --- Diff: external/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala --- @@ -61,6 +59,24 @@ class AvroFunctionsSuite extends QueryTest with SharedSQLContext { checkAnswer(avroStructDF.select(from_avro('avro, avroTypeStruct)), df) } + test("handle invalid input in from_avro") { +val count = 10 +val df = spark.range(count).select(struct('id, 'id.as("id2")).as("struct")) +val avroStructDF = df.select(to_avro('struct).as("avro")) +val avroTypeStruct = s""" + |{ + | "type": "record", + | "name": "struct", + | "fields": [ + |{"name": "col1", "type": "long"}, + |{"name": "col2", "type": "double"} + | ] + |} +""".stripMargin +val expected = (0 until count).map(_ => Row(Row(null, null))) +checkAnswer(avroStructDF.select(from_avro('avro, avroTypeStruct)), expected) --- End diff -- > Also, one good thing about PERMISSIVE mode is that we allow to fill invalid records at columnNameOfCorruptRecord yea Avro can't do it. But returning null instead of failing should still be a good thing for `from_avro`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22749: [SPARK-25746][SQL] Refactoring ExpressionEncoder to get ...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22749 thanks, merging to master! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22749: [SPARK-25746][SQL] Refactoring ExpressionEncoder ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22749#discussion_r228134985 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala --- @@ -212,21 +181,91 @@ object ExpressionEncoder { * A generic encoder for JVM objects that uses Catalyst Expressions for a `serializer` * and a `deserializer`. * - * @param schema The schema after converting `T` to a Spark SQL row. - * @param serializer A set of expressions, one for each top-level field that can be used to - * extract the values from a raw object into an [[InternalRow]]. - * @param deserializer An expression that will construct an object given an [[InternalRow]]. + * @param objSerializer An expression that can be used to encode a raw object to corresponding + * Spark SQL representation that can be a primitive column, array, map or a + * struct. This represents how Spark SQL generally serializes an object of + * type `T`. + * @param objDeserializer An expression that will construct an object given a Spark SQL + *representation. This represents how Spark SQL generally deserializes + *a serialized value in Spark SQL representation back to an object of + *type `T`. * @param clsTag A classtag for `T`. */ case class ExpressionEncoder[T]( -schema: StructType, -flat: Boolean, -serializer: Seq[Expression], -deserializer: Expression, +objSerializer: Expression, +objDeserializer: Expression, clsTag: ClassTag[T]) extends Encoder[T] { - if (flat) require(serializer.size == 1) + /** + * A sequence of expressions, one for each top-level field that can be used to + * extract the values from a raw object into an [[InternalRow]]: + * 1. If `serializer` encodes a raw object to a struct, strip the outer If-IsNull and get + *the `CreateNamedStruct`. + * 2. For other cases, wrap the single serializer with `CreateNamedStruct`. + */ + val serializer: Seq[NamedExpression] = { +val clsName = Utils.getSimpleName(clsTag.runtimeClass) + +if (isSerializedAsStruct) { + val nullSafeSerializer = objSerializer.transformUp { +case r: BoundReference => + // For input object of Product type, we can't encode it to row if it's null, as Spark SQL + // doesn't allow top-level row to be null, only its columns can be null. + AssertNotNull(r, Seq("top level Product or row object")) + } + nullSafeSerializer match { +case If(_: IsNull, _, s: CreateNamedStruct) => s +case s: CreateNamedStruct => s --- End diff -- this is minor, we can update it in another PR. We don't need to wait for another jenkins QA round. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22749: [SPARK-25746][SQL] Refactoring ExpressionEncoder ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22749#discussion_r228132840 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala --- @@ -212,21 +181,91 @@ object ExpressionEncoder { * A generic encoder for JVM objects that uses Catalyst Expressions for a `serializer` * and a `deserializer`. * - * @param schema The schema after converting `T` to a Spark SQL row. - * @param serializer A set of expressions, one for each top-level field that can be used to - * extract the values from a raw object into an [[InternalRow]]. - * @param deserializer An expression that will construct an object given an [[InternalRow]]. + * @param objSerializer An expression that can be used to encode a raw object to corresponding + * Spark SQL representation that can be a primitive column, array, map or a + * struct. This represents how Spark SQL generally serializes an object of + * type `T`. + * @param objDeserializer An expression that will construct an object given a Spark SQL + *representation. This represents how Spark SQL generally deserializes + *a serialized value in Spark SQL representation back to an object of + *type `T`. * @param clsTag A classtag for `T`. */ case class ExpressionEncoder[T]( -schema: StructType, -flat: Boolean, -serializer: Seq[Expression], -deserializer: Expression, +objSerializer: Expression, +objDeserializer: Expression, clsTag: ClassTag[T]) extends Encoder[T] { - if (flat) require(serializer.size == 1) + /** + * A sequence of expressions, one for each top-level field that can be used to + * extract the values from a raw object into an [[InternalRow]]: + * 1. If `serializer` encodes a raw object to a struct, strip the outer If-IsNull and get + *the `CreateNamedStruct`. + * 2. For other cases, wrap the single serializer with `CreateNamedStruct`. + */ + val serializer: Seq[NamedExpression] = { +val clsName = Utils.getSimpleName(clsTag.runtimeClass) + +if (isSerializedAsStruct) { + val nullSafeSerializer = objSerializer.transformUp { +case r: BoundReference => + // For input object of Product type, we can't encode it to row if it's null, as Spark SQL + // doesn't allow top-level row to be null, only its columns can be null. + AssertNotNull(r, Seq("top level Product or row object")) + } + nullSafeSerializer match { +case If(_: IsNull, _, s: CreateNamedStruct) => s +case s: CreateNamedStruct => s --- End diff -- when will we hit this? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22621: [SPARK-25602][SQL] SparkPlan.getByteArrayRdd should not ...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22621 why do we need migration guide for bug fix? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22814: [SPARK-25819][SQL] Support parse mode option for ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22814#discussion_r228063724 --- Diff: external/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala --- @@ -17,30 +17,49 @@ package org.apache.spark.sql.avro +import org.apache.avro.Schema + import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.RandomDataGenerator +import org.apache.spark.sql.{RandomDataGenerator, Row} import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} import org.apache.spark.sql.catalyst.expressions.{ExpressionEvalHelper, GenericInternalRow, Literal} import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, GenericArrayData, MapData} +import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ -class AvroCatalystDataConversionSuite extends SparkFunSuite with ExpressionEvalHelper { +class AvroCatalystDataConversionSuite extends SparkFunSuite + with SharedSQLContext + with ExpressionEvalHelper { private def roundTripTest(data: Literal): Unit = { val avroType = SchemaConverters.toAvroType(data.dataType, data.nullable) checkResult(data, avroType.toString, data.eval()) } private def checkResult(data: Literal, schema: String, expected: Any): Unit = { -checkEvaluation( - AvroDataToCatalyst(CatalystDataToAvro(data), schema), - prepareExpectedResult(expected)) +Seq("FAILFAST", "PERMISSIVE").foreach { mode => --- End diff -- why do we test 2 modes here? The mode is for when failure happens, but `checkResult` is for when no failures. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22814: [SPARK-25819][SQL] Support parse mode option for ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22814#discussion_r228063256 --- Diff: external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala --- @@ -79,4 +80,23 @@ class AvroOptions( val compression: String = { parameters.get("compression").getOrElse(SQLConf.get.avroCompressionCodec) } + + @transient private val acceptableParseMode = Seq(PermissiveMode, FailFastMode) --- End diff -- do we need to check mode here? The `AvroOptions` can be used with reading avro files, and DROP MALFORMED can be supported. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22814: [SPARK-25819][SQL] Support parse mode option for ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22814#discussion_r228062884 --- Diff: external/avro/src/main/scala/org/apache/spark/sql/avro/package.scala --- @@ -31,10 +32,32 @@ package object avro { * @since 2.4.0 */ @Experimental - def from_avro(data: Column, jsonFormatSchema: String): Column = { -new Column(AvroDataToCatalyst(data.expr, jsonFormatSchema)) + def from_avro( + data: Column, + jsonFormatSchema: String): Column = { +new Column(AvroDataToCatalyst(data.expr, jsonFormatSchema, Map.empty)) } + /** + * Converts a binary column of avro format into its corresponding catalyst value. The specified + * schema must match the read data, otherwise the behavior is undefined: it may fail or return + * arbitrary result. + * + * @param data the binary column. + * @param jsonFormatSchema the avro schema in JSON string format. + * @param options options to control how the Avro record is parsed. + * + * @since 2.4.0 --- End diff -- 3.0.0 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22814: [SPARK-25819][SQL] Support parse mode option for ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22814#discussion_r228062545 --- Diff: external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala --- @@ -44,24 +59,74 @@ case class AvroDataToCatalyst(child: Expression, jsonFormatSchema: String) @transient private var result: Any = _ + @transient private lazy val parseMode: ParseMode = { +val mode = AvroOptions(options).parseMode +if (mode != PermissiveMode && mode != FailFastMode) { + throw new AnalysisException(unacceptableModeMessage(mode.name)) +} +mode + } + + private def unacceptableModeMessage(name: String): String = { +s"from_avro() doesn't support the $name mode. " + + s"Acceptable modes are ${PermissiveMode.name} and ${FailFastMode.name}." + } + + @transient private lazy val nullResultRow: Any = dataType match { + case st: StructType => +val resultRow = new SpecificInternalRow(st.map(_.dataType)) +for(i <- 0 until st.length) { + resultRow.setNullAt(i) +} +resultRow + + case _ => +null +} + + override def nullSafeEval(input: Any): Any = { val binary = input.asInstanceOf[Array[Byte]] -decoder = DecoderFactory.get().binaryDecoder(binary, 0, binary.length, decoder) -result = reader.read(result, decoder) -deserializer.deserialize(result) +try { + decoder = DecoderFactory.get().binaryDecoder(binary, 0, binary.length, decoder) + result = reader.read(result, decoder) + deserializer.deserialize(result) +} catch { + // There could be multiple possible exceptions here, e.g. java.io.IOException, + // AvroRuntimeException, ArrayIndexOutOfBoundsException, etc. + // To make it simple, catch all the exceptions here. + case e: Exception => parseMode match { +case PermissiveMode => nullResultRow +case FailFastMode => + throw new SparkException("Malformed records are detected in record parsing. " + +s"Parse Mode: ${FailFastMode.name}.", e.getCause) +case _ => + throw new AnalysisException(unacceptableModeMessage(parseMode.name)) + } +} } override def simpleString: String = { -s"from_avro(${child.sql}, ${dataType.simpleString})" +s"from_avro(${child.sql}, ${dataType.simpleString}, ${options.toString()})" } override def sql: String = { -s"from_avro(${child.sql}, ${dataType.catalogString})" +s"from_avro(${child.sql}, ${dataType.catalogString}, ${options.toString()})" } override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { val expr = ctx.addReferenceObj("this", this) -defineCodeGen(ctx, ev, input => - s"(${CodeGenerator.boxedType(dataType)})$expr.nullSafeEval($input)") +nullSafeCodeGen(ctx, ev, eval => { + val result = ctx.freshName("tempResult") + s""" +${CodeGenerator.boxedType(dataType)} $result = --- End diff -- nit: maybe ``` val dt = CodeGenerator.boxedType(dataType) dt $result = ($dt) ... ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22814: [SPARK-25819][SQL] Support parse mode option for ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22814#discussion_r228061155 --- Diff: external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala --- @@ -44,24 +59,74 @@ case class AvroDataToCatalyst(child: Expression, jsonFormatSchema: String) @transient private var result: Any = _ + @transient private lazy val parseMode: ParseMode = { +val mode = AvroOptions(options).parseMode +if (mode != PermissiveMode && mode != FailFastMode) { + throw new AnalysisException(unacceptableModeMessage(mode.name)) +} +mode + } + + private def unacceptableModeMessage(name: String): String = { +s"from_avro() doesn't support the $name mode. " + + s"Acceptable modes are ${PermissiveMode.name} and ${FailFastMode.name}." + } + + @transient private lazy val nullResultRow: Any = dataType match { + case st: StructType => +val resultRow = new SpecificInternalRow(st.map(_.dataType)) +for(i <- 0 until st.length) { + resultRow.setNullAt(i) +} +resultRow + + case _ => +null +} + + override def nullSafeEval(input: Any): Any = { val binary = input.asInstanceOf[Array[Byte]] -decoder = DecoderFactory.get().binaryDecoder(binary, 0, binary.length, decoder) -result = reader.read(result, decoder) -deserializer.deserialize(result) +try { + decoder = DecoderFactory.get().binaryDecoder(binary, 0, binary.length, decoder) + result = reader.read(result, decoder) + deserializer.deserialize(result) +} catch { + // There could be multiple possible exceptions here, e.g. java.io.IOException, + // AvroRuntimeException, ArrayIndexOutOfBoundsException, etc. + // To make it simple, catch all the exceptions here. + case e: Exception => parseMode match { --- End diff -- we should catch `NonFatal` to be safer --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22814: [SPARK-25819][SQL] Support parse mode option for ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22814#discussion_r228059940 --- Diff: docs/sql-data-sources-avro.md --- @@ -177,6 +180,18 @@ Data source options of Avro can be set using the `.option` method on `DataFrameR Currently supported codecs are uncompressed, snappy, deflate, bzip2 and xz. If the option is not set, the configuration spark.sql.avro.compression.codec config is taken into account. write + +mode +PERMISSIVE +The mode option allows to specify parse mode for function from_avro. + Currently supported modes are: + +PERMISSIVE: Corrupt records are processed as null result. --- End diff -- not null result, but a row with all columns null. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22821: [SPARK-25832][SQL] remove newly added map related functi...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22821 cc @dongjoon-hyun @gatorsmile @rxin --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22821: [SPARK-25832][] remove newly added map related fu...
GitHub user cloud-fan opened a pull request: https://github.com/apache/spark/pull/22821 [SPARK-25832][] remove newly added map related functions from FunctionRegistry ## What changes were proposed in this pull request? (Please fill in changes proposed in this fix) ## How was this patch tested? (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Please review http://spark.apache.org/contributing.html before opening a pull request. You can merge this pull request into a Git repository by running: $ git pull https://github.com/cloud-fan/spark revert Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22821.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22821 commit 7e919e3efa3d669a6893b47c737e553795d94347 Author: Wenchen Fan Date: 2018-10-25T04:42:17Z remove newly added map related functions from FunctionRegistry --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22773: [SPARK-25785][SQL] Add prettyNames for from_json, to_jso...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22773 I think the new names are better and expected, though it's safer to mention it in the migration guide in case some users care about it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22755: [SPARK-25755][SQL][Test] Supplementation of non-CodeGen ...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22755 +1 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22514: [SPARK-25271][SQL] Hive ctas commands should use ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22514#discussion_r228013784 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala --- @@ -45,6 +46,11 @@ case class CreateHiveTableAsSelectCommand( override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row] = { val catalog = sparkSession.sessionState.catalog +val metastoreCatalog = catalog.asInstanceOf[HiveSessionCatalog].metastoreCatalog + +// Whether this table is convertible to data source relation. +val isConvertible = metastoreCatalog.isConvertible(tableDesc) --- End diff -- ah makes sense, thanks for trying! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22809: [SPARK-19851][SQL] Add support for EVERY and ANY (SOME) ...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22809 LGTM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22809: [SPARK-19851][SQL] Add support for EVERY and ANY ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22809#discussion_r228013503 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala --- @@ -727,4 +728,67 @@ class DataFrameAggregateSuite extends QueryTest with SharedSQLContext { "grouping expressions: [current_date(None)], value: [key: int, value: string], " + "type: GroupBy]")) } + + def getEveryAggColumn(columnName: String): Column = { +Column(new EveryAgg(Column(columnName).expr).toAggregateExpression(false)) --- End diff -- Since we don't have APIs for them in `functions`, it's not likely users will use then with DataFrame. Thus I think we don't need these tests. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22809: [SPARK-19851][SQL] Add support for EVERY and ANY (SOME) ...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22809 Can we use these functions in window with this approach? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22809: [SPARK-19851][SQL] Add support for EVERY and ANY ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22809#discussion_r227899342 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala --- @@ -38,6 +39,18 @@ object ReplaceExpressions extends Rule[LogicalPlan] { } } +/** --- End diff -- ah this sounds better! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22144: [SPARK-24935][SQL] : Problem with Executing Hive UDF's f...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22144 Maybe we should discuss it in another thread, I think we should officially write down the procedure about how to evaluate a bug report and label it as a blocker or not. Currently https://spark.apache.org/contributing.html only says that data correctness issues should be blockers, which can't cover all the cases(like this one). It's also inefficient if we require a PMC vote for every issue to decide if it's a blocker or not. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22749: [SPARK-25746][SQL] Refactoring ExpressionEncoder ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22749#discussion_r227867735 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala --- @@ -280,59 +281,59 @@ class ScalaReflectionSuite extends SparkFunSuite { test("serialize and deserialize arbitrary sequence types") { import scala.collection.immutable.Queue -val queueSerializer = serializerFor[Queue[Int]](BoundReference( - 0, ObjectType(classOf[Queue[Int]]), nullable = false)) -assert(queueSerializer.dataType.head.dataType == +val queueSerializer = serializerForType(ScalaReflection.localTypeOf[Queue[Int]]) +assert(queueSerializer.dataType == ArrayType(IntegerType, containsNull = false)) val queueDeserializer = deserializerFor[Queue[Int]] assert(queueDeserializer.dataType == ObjectType(classOf[Queue[_]])) import scala.collection.mutable.ArrayBuffer -val arrayBufferSerializer = serializerFor[ArrayBuffer[Int]](BoundReference( - 0, ObjectType(classOf[ArrayBuffer[Int]]), nullable = false)) -assert(arrayBufferSerializer.dataType.head.dataType == +val arrayBufferSerializer = serializerForType(ScalaReflection.localTypeOf[ArrayBuffer[Int]]) +assert(arrayBufferSerializer.dataType == ArrayType(IntegerType, containsNull = false)) val arrayBufferDeserializer = deserializerFor[ArrayBuffer[Int]] assert(arrayBufferDeserializer.dataType == ObjectType(classOf[ArrayBuffer[_]])) } test("serialize and deserialize arbitrary map types") { -val mapSerializer = serializerFor[Map[Int, Int]](BoundReference( - 0, ObjectType(classOf[Map[Int, Int]]), nullable = false)) -assert(mapSerializer.dataType.head.dataType == +val mapSerializer = serializerForType(ScalaReflection.localTypeOf[Map[Int, Int]]) +assert(mapSerializer.dataType == MapType(IntegerType, IntegerType, valueContainsNull = false)) val mapDeserializer = deserializerFor[Map[Int, Int]] assert(mapDeserializer.dataType == ObjectType(classOf[Map[_, _]])) import scala.collection.immutable.HashMap -val hashMapSerializer = serializerFor[HashMap[Int, Int]](BoundReference( - 0, ObjectType(classOf[HashMap[Int, Int]]), nullable = false)) -assert(hashMapSerializer.dataType.head.dataType == +val hashMapSerializer = serializerForType(ScalaReflection.localTypeOf[HashMap[Int, Int]]) +assert(hashMapSerializer.dataType == MapType(IntegerType, IntegerType, valueContainsNull = false)) val hashMapDeserializer = deserializerFor[HashMap[Int, Int]] assert(hashMapDeserializer.dataType == ObjectType(classOf[HashMap[_, _]])) import scala.collection.mutable.{LinkedHashMap => LHMap} -val linkedHashMapSerializer = serializerFor[LHMap[Long, String]](BoundReference( - 0, ObjectType(classOf[LHMap[Long, String]]), nullable = false)) -assert(linkedHashMapSerializer.dataType.head.dataType == +val linkedHashMapSerializer = serializerForType( +ScalaReflection.localTypeOf[LHMap[Long, String]]) +assert(linkedHashMapSerializer.dataType == MapType(LongType, StringType, valueContainsNull = true)) val linkedHashMapDeserializer = deserializerFor[LHMap[Long, String]] assert(linkedHashMapDeserializer.dataType == ObjectType(classOf[LHMap[_, _]])) } test("SPARK-22442: Generate correct field names for special characters") { -val serializer = serializerFor[SpecialCharAsFieldData](BoundReference( - 0, ObjectType(classOf[SpecialCharAsFieldData]), nullable = false)) +val serializer = serializerForType(ScalaReflection.localTypeOf[SpecialCharAsFieldData]) --- End diff -- like `deserializerFor` in this suite, let's also create a `serializerFor` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22144: [SPARK-24935][SQL] : Problem with Executing Hive UDF's f...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22144 @tgravescs please quote my full comment instead of part of it. > After all, this is a bug and a regression from previous releases, like other 1000 we've fixed before. The point I was making there is, this issue is not the ones that HAVE TO block a release, like correctness issue. I immediately list the reasons afterward why I don't think it's a blocker. > hive compatibility is not that important to Spark at this point I'm sorry if this worries you. It's true that we focus more on Spark itself instead of Hive compatibility in the recent development, but this should not be applied to existing Hive compatibility features in Spark and we should still maintain them. BTW, I removed the `supportPartial` flag because no aggregate functions in Spark need it(including the adapted Hive UDAF), but the problem exists in how to adapt Hive UDAF, which was introduced by https://issues.apache.org/jira/browse/SPARK-18186 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22730: [SPARK-16775][CORE] Remove deprecated accumulator v1 API...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22730 A late LGTM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22749: [SPARK-25746][SQL] Refactoring ExpressionEncoder ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22749#discussion_r227783724 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala --- @@ -280,59 +281,59 @@ class ScalaReflectionSuite extends SparkFunSuite { test("serialize and deserialize arbitrary sequence types") { import scala.collection.immutable.Queue -val queueSerializer = serializerFor[Queue[Int]](BoundReference( - 0, ObjectType(classOf[Queue[Int]]), nullable = false)) -assert(queueSerializer.dataType.head.dataType == +val queueSerializer = serializerForType(ScalaReflection.localTypeOf[Queue[Int]]) +assert(queueSerializer.dataType == ArrayType(IntegerType, containsNull = false)) val queueDeserializer = deserializerFor[Queue[Int]] assert(queueDeserializer.dataType == ObjectType(classOf[Queue[_]])) import scala.collection.mutable.ArrayBuffer -val arrayBufferSerializer = serializerFor[ArrayBuffer[Int]](BoundReference( - 0, ObjectType(classOf[ArrayBuffer[Int]]), nullable = false)) -assert(arrayBufferSerializer.dataType.head.dataType == +val arrayBufferSerializer = serializerForType(ScalaReflection.localTypeOf[ArrayBuffer[Int]]) +assert(arrayBufferSerializer.dataType == ArrayType(IntegerType, containsNull = false)) val arrayBufferDeserializer = deserializerFor[ArrayBuffer[Int]] assert(arrayBufferDeserializer.dataType == ObjectType(classOf[ArrayBuffer[_]])) } test("serialize and deserialize arbitrary map types") { -val mapSerializer = serializerFor[Map[Int, Int]](BoundReference( - 0, ObjectType(classOf[Map[Int, Int]]), nullable = false)) -assert(mapSerializer.dataType.head.dataType == +val mapSerializer = serializerForType(ScalaReflection.localTypeOf[Map[Int, Int]]) +assert(mapSerializer.dataType == MapType(IntegerType, IntegerType, valueContainsNull = false)) val mapDeserializer = deserializerFor[Map[Int, Int]] assert(mapDeserializer.dataType == ObjectType(classOf[Map[_, _]])) import scala.collection.immutable.HashMap -val hashMapSerializer = serializerFor[HashMap[Int, Int]](BoundReference( - 0, ObjectType(classOf[HashMap[Int, Int]]), nullable = false)) -assert(hashMapSerializer.dataType.head.dataType == +val hashMapSerializer = serializerForType(ScalaReflection.localTypeOf[HashMap[Int, Int]]) +assert(hashMapSerializer.dataType == MapType(IntegerType, IntegerType, valueContainsNull = false)) val hashMapDeserializer = deserializerFor[HashMap[Int, Int]] assert(hashMapDeserializer.dataType == ObjectType(classOf[HashMap[_, _]])) import scala.collection.mutable.{LinkedHashMap => LHMap} -val linkedHashMapSerializer = serializerFor[LHMap[Long, String]](BoundReference( - 0, ObjectType(classOf[LHMap[Long, String]]), nullable = false)) -assert(linkedHashMapSerializer.dataType.head.dataType == +val linkedHashMapSerializer = serializerForType( +ScalaReflection.localTypeOf[LHMap[Long, String]]) +assert(linkedHashMapSerializer.dataType == MapType(LongType, StringType, valueContainsNull = true)) val linkedHashMapDeserializer = deserializerFor[LHMap[Long, String]] assert(linkedHashMapDeserializer.dataType == ObjectType(classOf[LHMap[_, _]])) } test("SPARK-22442: Generate correct field names for special characters") { -val serializer = serializerFor[SpecialCharAsFieldData](BoundReference( - 0, ObjectType(classOf[SpecialCharAsFieldData]), nullable = false)) +val serializer = serializerForType(ScalaReflection.localTypeOf[SpecialCharAsFieldData]) --- End diff -- can we replace all the `serializerForType` with `serializerFor` in this suite? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_json
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22237 thanks, merging to master! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22749: [SPARK-25746][SQL] Refactoring ExpressionEncoder to get ...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22749 LGTM except 2 minor comments --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22749: [SPARK-25746][SQL] Refactoring ExpressionEncoder ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22749#discussion_r227742062 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala --- @@ -351,11 +347,15 @@ class ScalaReflectionSuite extends SparkFunSuite { test("SPARK-23835: add null check to non-nullable types in Tuples") { def numberOfCheckedArguments(deserializer: Expression): Int = { - assert(deserializer.isInstanceOf[NewInstance]) - deserializer.asInstanceOf[NewInstance].arguments.count(_.isInstanceOf[AssertNotNull]) + val newInstance = deserializer.collect { case n: NewInstance => n}.head + newInstance.arguments.count(_.isInstanceOf[AssertNotNull]) } -assert(numberOfCheckedArguments(deserializerFor[(Double, Double)]) == 2) -assert(numberOfCheckedArguments(deserializerFor[(java.lang.Double, Int)]) == 1) -assert(numberOfCheckedArguments(deserializerFor[(java.lang.Integer, java.lang.Integer)]) == 0) +assert(numberOfCheckedArguments( + deserializerForType(ScalaReflection.localTypeOf[(Double, Double)])) == 2) --- End diff -- shall we create a `deserializerFor` method in this test suite to save some code diff? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22749: [SPARK-25746][SQL] Refactoring ExpressionEncoder ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22749#discussion_r227739775 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala --- @@ -212,21 +181,90 @@ object ExpressionEncoder { * A generic encoder for JVM objects that uses Catalyst Expressions for a `serializer` * and a `deserializer`. * - * @param schema The schema after converting `T` to a Spark SQL row. - * @param serializer A set of expressions, one for each top-level field that can be used to - * extract the values from a raw object into an [[InternalRow]]. - * @param deserializer An expression that will construct an object given an [[InternalRow]]. + * @param objSerializer An expression that can be used to encode a raw object to corresponding + * Spark SQL representation that can be a primitive column, array, map or a + * struct. This represents how Spark SQL generally serializes an object of + * type `T`. + * @param objDeserializer An expression that will construct an object given a Spark SQL + *representation. This represents how Spark SQL generally deserializes + *a serialized value in Spark SQL representation back to an object of + *type `T`. * @param clsTag A classtag for `T`. */ case class ExpressionEncoder[T]( -schema: StructType, -flat: Boolean, -serializer: Seq[Expression], -deserializer: Expression, +objSerializer: Expression, +objDeserializer: Expression, clsTag: ClassTag[T]) extends Encoder[T] { - if (flat) require(serializer.size == 1) + /** + * A sequence of expressions, one for each top-level field that can be used to + * extract the values from a raw object into an [[InternalRow]]: + * 1. If `serializer` encodes a raw object to a struct, we directly use the `serializer`. + * 2. For other cases, we create a struct to wrap the `serializer`. --- End diff -- Let's make these 2 comments more precise ``` 1. If `serializer` encodes a raw object to a struct, strip the outer if-IsNull and get the CreateNamedStruct 2. For other cases, wrap the single serializer with CreateNamedStruct ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22749: [SPARK-25746][SQL] Refactoring ExpressionEncoder ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22749#discussion_r227682823 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala --- @@ -58,12 +58,10 @@ object RowEncoder { def apply(schema: StructType): ExpressionEncoder[Row] = { val cls = classOf[Row] val inputObject = BoundReference(0, ObjectType(cls), nullable = true) -val serializer = serializerFor(AssertNotNull(inputObject, Seq("top level row object")), schema) -val deserializer = deserializerFor(schema) +val serializer = serializerFor(inputObject, schema) +val deserializer = deserializerFor(GetColumnByOrdinal(0, serializer.dataType), schema) --- End diff -- ah i see, then let's leave it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22749: [SPARK-25746][SQL] Refactoring ExpressionEncoder ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22749#discussion_r227682672 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala --- @@ -434,17 +426,34 @@ object ScalaReflection extends ScalaReflection { * * the element type of [[Array]] or [[Seq]]: `array element class: "abc.xyz.MyClass"` * * the field of [[Product]]: `field (class: "abc.xyz.MyClass", name: "myField")` */ - def serializerFor[T : TypeTag](inputObject: Expression): CreateNamedStruct = { -val tpe = localTypeOf[T] + def serializerForType(tpe: `Type`, + cls: RuntimeClass): Expression = ScalaReflection.cleanUpReflectionObjects { val clsName = getClassNameFromType(tpe) val walkedTypePath = s"""- root class: "$clsName"""" :: Nil -serializerFor(inputObject, tpe, walkedTypePath) match { - case expressions.If(_, _, s: CreateNamedStruct) if definedByConstructorParams(tpe) => s - case other => CreateNamedStruct(expressions.Literal("value") :: other :: Nil) -} + +// The input object to `ExpressionEncoder` is located at first column of an row. +val inputObject = BoundReference(0, dataTypeFor(tpe), + nullable = !cls.isPrimitive) --- End diff -- good, then we don't need `cls` as a parameter. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22749: [SPARK-25746][SQL] Refactoring ExpressionEncoder ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22749#discussion_r227681844 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala --- @@ -1087,7 +1087,7 @@ class Dataset[T] private[sql]( // Note that we do this before joining them, to enable the join operator to return null for one // side, in cases like outer-join. val left = { - val combined = if (this.exprEnc.flat) { + val combined = if (!this.exprEnc.objSerializer.dataType.isInstanceOf[StructType]) { --- End diff -- shall we create a method in `ExpressionEncoder` for this check? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22749: [SPARK-25746][SQL] Refactoring ExpressionEncoder ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22749#discussion_r227675880 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala --- @@ -58,12 +58,10 @@ object RowEncoder { def apply(schema: StructType): ExpressionEncoder[Row] = { val cls = classOf[Row] val inputObject = BoundReference(0, ObjectType(cls), nullable = true) -val serializer = serializerFor(AssertNotNull(inputObject, Seq("top level row object")), schema) -val deserializer = deserializerFor(schema) +val serializer = serializerFor(inputObject, schema) +val deserializer = deserializerFor(GetColumnByOrdinal(0, serializer.dataType), schema) --- End diff -- in `ScalaReflection`, we create `GetColumnByOrdinal` in `deserializeFor`, shall we follow it here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22749: [SPARK-25746][SQL] Refactoring ExpressionEncoder ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22749#discussion_r227673675 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala --- @@ -434,17 +426,34 @@ object ScalaReflection extends ScalaReflection { * * the element type of [[Array]] or [[Seq]]: `array element class: "abc.xyz.MyClass"` * * the field of [[Product]]: `field (class: "abc.xyz.MyClass", name: "myField")` */ - def serializerFor[T : TypeTag](inputObject: Expression): CreateNamedStruct = { -val tpe = localTypeOf[T] + def serializerForType(tpe: `Type`, + cls: RuntimeClass): Expression = ScalaReflection.cleanUpReflectionObjects { val clsName = getClassNameFromType(tpe) val walkedTypePath = s"""- root class: "$clsName"""" :: Nil -serializerFor(inputObject, tpe, walkedTypePath) match { - case expressions.If(_, _, s: CreateNamedStruct) if definedByConstructorParams(tpe) => s - case other => CreateNamedStruct(expressions.Literal("value") :: other :: Nil) -} + +// The input object to `ExpressionEncoder` is located at first column of an row. +val inputObject = BoundReference(0, dataTypeFor(tpe), + nullable = !cls.isPrimitive) + +serializerFor(inputObject, tpe, walkedTypePath) } - /** Helper for extracting internal fields from a case class. */ + /** + * Returns an expression for serializing the value of an input expression into Spark SQL --- End diff -- do we really need to duplicate the doc in this private method? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22749: [SPARK-25746][SQL] Refactoring ExpressionEncoder ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22749#discussion_r227672066 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala --- @@ -434,17 +426,34 @@ object ScalaReflection extends ScalaReflection { * * the element type of [[Array]] or [[Seq]]: `array element class: "abc.xyz.MyClass"` * * the field of [[Product]]: `field (class: "abc.xyz.MyClass", name: "myField")` */ - def serializerFor[T : TypeTag](inputObject: Expression): CreateNamedStruct = { -val tpe = localTypeOf[T] + def serializerForType(tpe: `Type`, + cls: RuntimeClass): Expression = ScalaReflection.cleanUpReflectionObjects { val clsName = getClassNameFromType(tpe) val walkedTypePath = s"""- root class: "$clsName"""" :: Nil -serializerFor(inputObject, tpe, walkedTypePath) match { - case expressions.If(_, _, s: CreateNamedStruct) if definedByConstructorParams(tpe) => s - case other => CreateNamedStruct(expressions.Literal("value") :: other :: Nil) -} + +// The input object to `ExpressionEncoder` is located at first column of an row. +val inputObject = BoundReference(0, dataTypeFor(tpe), + nullable = !cls.isPrimitive) --- End diff -- we just check isPrimitive of the given `cls`, can we check `tpe` directly? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_json
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22237 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22812: [SPARK-25817][SQL] Dataset encoder should support combin...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22812 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22749: [SPARK-25746][SQL] Refactoring ExpressionEncoder to get ...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22749 hmm, it still has conflict... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21860: [SPARK-24901][SQL]Merge the codegen of RegularHas...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21860#discussion_r227655432 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala --- @@ -831,7 +832,14 @@ case class HashAggregateExec( ctx.currentVars = new Array[ExprCode](aggregateBufferAttributes.length) ++ input val updateRowInRegularHashMap: String = { - ctx.INPUT_ROW = unsafeRowBuffer + val updatedTmpAggBuffer = +if (isFastHashMapEnabled && !isVectorizedHashMapEnabled) { + updatedAggBuffer --- End diff -- This also simplifies the generated code. We don't need a if-else to assign value to this new variable. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22514: [SPARK-25271][SQL] Hive ctas commands should use ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22514#discussion_r227654755 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala --- @@ -45,6 +46,11 @@ case class CreateHiveTableAsSelectCommand( override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row] = { val catalog = sparkSession.sessionState.catalog +val metastoreCatalog = catalog.asInstanceOf[HiveSessionCatalog].metastoreCatalog + +// Whether this table is convertible to data source relation. +val isConvertible = metastoreCatalog.isConvertible(tableDesc) --- End diff -- I feel `CreateHiveTableAsSelectCommand` is not useful. It simply creates the table first and then call `InsertIntoHiveTable.run`. Maybe we should just remove it and implement hive table CTAS as `Union(CreateTable, InsertIntoTable)`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22514: [SPARK-25271][SQL] Hive ctas commands should use ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22514#discussion_r227654240 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala --- @@ -45,6 +46,11 @@ case class CreateHiveTableAsSelectCommand( override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row] = { val catalog = sparkSession.sessionState.catalog +val metastoreCatalog = catalog.asInstanceOf[HiveSessionCatalog].metastoreCatalog + +// Whether this table is convertible to data source relation. +val isConvertible = metastoreCatalog.isConvertible(tableDesc) --- End diff -- another idea: can we move this logic to the `RelationConversions` rule? e.g. ``` case CreateTable(tbl, mode, Some(query)) if DDLUtils.isHiveTable(tbl) && isConvertible(tbl) => Union(CreateTable(tbl, mode, None), InsertIntoTable ...) ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22514: [SPARK-25271][SQL] Hive ctas commands should use ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22514#discussion_r227653464 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala --- @@ -92,4 +92,18 @@ class HiveParquetSuite extends QueryTest with ParquetTest with TestHiveSingleton } } } + + test("SPARK-25271: write empty map into hive parquet table") { +val testData = hiveContext.getHiveFile("data/files/empty_map.dat").getCanonicalFile() +val sourceTable = "sourceTable" +val targetTable = "targetTable" +withTable(sourceTable, targetTable) { + sql(s"CREATE TABLE $sourceTable (i int,m map) ROW FORMAT DELIMITED FIELDS " + +"TERMINATED BY ',' COLLECTION ITEMS TERMINATED BY ':' MAP KEYS TERMINATED BY '$'") + sql(s"LOAD DATA LOCAL INPATH '${testData.toURI}' INTO TABLE $sourceTable") --- End diff -- can we generate the input data with a temp view? e.g. create a dataframe with literals and register temp view. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21860: [SPARK-24901][SQL]Merge the codegen of RegularHas...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21860#discussion_r227650326 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala --- @@ -831,7 +832,14 @@ case class HashAggregateExec( ctx.currentVars = new Array[ExprCode](aggregateBufferAttributes.length) ++ input val updateRowInRegularHashMap: String = { - ctx.INPUT_ROW = unsafeRowBuffer + val updatedTmpAggBuffer = +if (isFastHashMapEnabled && !isVectorizedHashMapEnabled) { + updatedAggBuffer --- End diff -- just realized it. Do we create the `updatedAggBuffer` variable only to improve the readability of the generated code? It looks to me we don't need this variable. Here we can write ``` ctx.INPUT_ROW = if (isFastHashMapEnabled && !isVectorizedHashMapEnabled) fastRowBuffer else unsafeRowBuffer ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22144: [SPARK-24935][SQL] : Problem with Executing Hive UDF's f...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22144 Unfortunately, we didn't drop it mistakenly. It's a mistake and we should fix it. What I try to avoid is adding back the `supportsPartial` flag. We should look into the root cause and see how to fix it better. I don't know if this policy is written down officially, but I do remember we followed this policy many times in the previous releases. Please correct me if I am wrong. I'll list it as a known issue in 2.4.0 release notes. It will be great if someone can investigate the root cause and propose a fix(with a test). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_json
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22237 LGTM, pending jenkins. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22812: [SPARK-25817][SQL] Dataset encoder should support combin...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22812 cc @michalsenkyr @vofque @viirya --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22812: [SPARK-25817][SQL] Dataset encoder should support...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22812#discussion_r227638941 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala --- @@ -1837,8 +1837,6 @@ case class GetArrayFromMap private( arrayGetter: MapData => ArrayData, elementTypeGetter: MapType => DataType) extends UnaryExpression with NonSQLExpression { - private lazy val encodedFunctionName: String = TermName(functionName).encodedName.toString --- End diff -- this is to address https://github.com/apache/spark/pull/22745#discussion_r227407344 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22812: [SPARK-25817][SQL] Dataset encoder should support...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22812#discussion_r227638902 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala --- @@ -1090,15 +1096,9 @@ case class CatalystToExternalMap private( val tupleLoopValue = ctx.freshName("tupleLoopValue") val builderValue = ctx.freshName("builderValue") -val getLength = s"${genInputData.value}.numElements()" --- End diff -- these are unrelated, but is a followup of https://github.com/apache/spark/pull/16986 to address the remaining comments. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22812: [SPARK-25817][SQL] Dataset encoder should support...
GitHub user cloud-fan opened a pull request: https://github.com/apache/spark/pull/22812 [SPARK-25817][SQL] Dataset encoder should support combination of map and product type ## What changes were proposed in this pull request? After https://github.com/apache/spark/pull/22745 , Dataset encoder supports the combination of java bean and map type. This PR is to fix the Scala side. The reason why it didn't work before is, `CatalystToExternalMap` tries to get the data type of the input map expression, while it can be unresolved and its data type is known. To fix it, we can follow `UnresolvedMapObjects`, to create a `UnresolvedCatalystToExternalMap`, and only create `CatalystToExternalMap` when the input map expression is resolved and the data type is known. ## How was this patch tested? enable a old test case You can merge this pull request into a Git repository by running: $ git pull https://github.com/cloud-fan/spark map Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22812.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22812 commit da31d2602b8e12eb8949336cf14b903c0df731cf Author: Wenchen Fan Date: 2018-10-24T04:21:44Z Dataset encoder should support combination of map and product type --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22514: [SPARK-25271][SQL] Hive ctas commands should use data so...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22514 sounds like a clean solution. please go ahead, thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22512: [SPARK-25498][SQL] InterpretedMutableProjection s...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22512#discussion_r227618778 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MutableProjectionSuite.scala --- @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.CalendarInterval + +class MutableProjectionSuite extends SparkFunSuite with ExpressionEvalHelper { + + private def createMutableProjection(dataTypes: Array[DataType]): MutableProjection = { +MutableProjection.create(dataTypes.zipWithIndex.map(x => BoundReference(x._2, x._1, true))) + } + + testBothCodegenAndInterpreted("fixed-length types") { +val fixedLengthTypes = Array[DataType]( + BooleanType, ByteType, ShortType, IntegerType, LongType, FloatType, DoubleType, + DateType, TimestampType) +val proj = createMutableProjection(fixedLengthTypes) +val inputRow = InternalRow.fromSeq( + Seq(false, 3.toByte, 15.toShort, -83, 129L, 1.0f, 5.0, 100, 200L)) +assert(proj(inputRow) === inputRow) + +// Use UnsafeRow as buffer +val numBytes = UnsafeRow.calculateBitSetWidthInBytes(fixedLengthTypes.length) +val unsafeBuffer = UnsafeRow.createFromByteArray(numBytes, fixedLengthTypes.length) +val projUnsafeRow = proj.target(unsafeBuffer)(inputRow) +assert(FromUnsafeProjection(fixedLengthTypes)(projUnsafeRow) === inputRow) + } + + testBothCodegenAndInterpreted("variable-length types") { +val variableLengthTypes = Array( + StringType, DecimalType.defaultConcreteType, CalendarIntervalType, BinaryType, + ArrayType(StringType), MapType(IntegerType, StringType), + StructType.fromDDL("a INT, b STRING"), ObjectType(classOf[java.lang.Integer])) +val proj = createMutableProjection(variableLengthTypes) --- End diff -- shall we also test that we should fail if the target row is unsafe row? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22512: [SPARK-25498][SQL] InterpretedMutableProjection s...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22512#discussion_r227618313 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InternalRow.scala --- @@ -143,4 +144,25 @@ object InternalRow { case u: UserDefinedType[_] => getAccessor(u.sqlType) case _ => (input, ordinal) => input.get(ordinal, dataType) } + + /** + * Returns a writer for an `InternalRow` with given data type. + */ + def getWriter(ordinal: Int, dt: DataType): (InternalRow, Any) => Unit = dt match { +case BooleanType => (input, v) => input.setBoolean(ordinal, v.asInstanceOf[Boolean]) +case ByteType => (input, v) => input.setByte(ordinal, v.asInstanceOf[Byte]) +case ShortType => (input, v) => input.setShort(ordinal, v.asInstanceOf[Short]) +case IntegerType | DateType => (input, v) => input.setInt(ordinal, v.asInstanceOf[Int]) +case LongType | TimestampType => (input, v) => input.setLong(ordinal, v.asInstanceOf[Long]) +case FloatType => (input, v) => input.setFloat(ordinal, v.asInstanceOf[Float]) +case DoubleType => (input, v) => input.setDouble(ordinal, v.asInstanceOf[Double]) +case DecimalType.Fixed(precision, _) => + (input, v) => input.setDecimal(ordinal, v.asInstanceOf[Decimal], precision) +case CalendarIntervalType | BinaryType | _: ArrayType | StringType | _: StructType | + _: MapType | _: ObjectType => + (input, v) => input.update(ordinal, v) +case udt: UserDefinedType[_] => getWriter(ordinal, udt.sqlType) +case NullType => (input, _) => input.setNullAt(ordinal) +case _ => throw new SparkException(s"Unsupported data type $dt") --- End diff -- one minor point: the codegen version just call `row.update` for un-caught types, which means it supports object type as well. Shall we follow? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22745: [SPARK-25772][SQL] Fix java map of structs deserializati...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22745 thanks, merging to master! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22745: [SPARK-25772][SQL] Fix java map of structs deseri...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22745#discussion_r227611044 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala --- @@ -1787,3 +1788,78 @@ case class ValidateExternalType(child: Expression, expected: DataType) ev.copy(code = code, isNull = input.isNull) } } + +object GetKeyArrayFromMap { + + /** + * Construct an instance of GetArrayFromMap case class + * extracting a key array from a Map expression. + * + * @param child a Map expression to extract a key array from + */ + def apply(child: Expression): Expression = { +GetArrayFromMap( + child, + "keyArray", + _.keyArray(), + { case MapType(kt, _, _) => kt }) + } +} + +object GetValueArrayFromMap { + + /** + * Construct an instance of GetArrayFromMap case class + * extracting a value array from a Map expression. + * + * @param child a Map expression to extract a value array from + */ + def apply(child: Expression): Expression = { +GetArrayFromMap( + child, + "valueArray", + _.valueArray(), + { case MapType(_, vt, _) => vt }) + } +} + +/** + * Extracts a key/value array from a Map expression. + * + * @param child a Map expression to extract an array from + * @param functionName name of the function that is invoked to extract an array + * @param arrayGetter function extracting `ArrayData` from `MapData` + * @param elementTypeGetter function extracting array element `DataType` from `MapType` + */ +case class GetArrayFromMap private( +child: Expression, +functionName: String, +arrayGetter: MapData => ArrayData, +elementTypeGetter: MapType => DataType) extends UnaryExpression with NonSQLExpression { + + private lazy val encodedFunctionName: String = TermName(functionName).encodedName.toString --- End diff -- We forgot to remove it. It's not a big issue and we don't need to waste one more QA round for it. I'll fix it in a followup PR that apply this approach to `ScalaReflection`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22144: [SPARK-24935][SQL] : Problem with Executing Hive UDF's f...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22144 After all, this is a bug and a regression from previous releases, like other 1000 we've fixed before. According to the policy, we don't have to block the current release because of it, and this bug 1. is a hive compatibility bug. Spark fails to run some Hive UDAFs 2. It fails the job instead of returning wrong result 3. the root cause is unknown Thus, I think it's a non-blocker. I looked into it more and I'm 90% sure this is caused by https://issues.apache.org/jira/browse/SPARK-18186 . We should spend more time on understanding how hive execute UDAF and fix the way we adapt it to Spark aggregate function. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22788: [SPARK-25769][SQL]escape nested columns by backtick each...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22788 I agree with the problem described in the PR description that `UnresolvedAttribute.sql` is not ideal. But we should just update `UnresolvedAttribute.sql`, not the `name` method. `name` is used in other places and I think it has no problem. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22514: [SPARK-25271][SQL] Hive ctas commands should use data so...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22514 @viirya can you explain the high-level idea about how to fix it? It seems hard to fix and we should get a consensus on the approach first. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22514: [SPARK-25271][SQL] Hive ctas commands should use ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22514#discussion_r227602783 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala --- @@ -34,11 +34,16 @@ import org.apache.spark.sql.types._ * @param tableDesc the metadata of the table to be created. * @param mode the data writing mode * @param query an optional logical plan representing data to write into the created table. + * @param useExternalSerde whether to use external serde to write data, e.g., Hive Serde. Currently --- End diff -- I don't have a clear idea now, but `CreateTable` is a general logical plan for CREATE TABLE, we may even public in to data source/catalog APIs in the future, we should not put hive specific concept here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22514: [SPARK-25271][SQL] Hive ctas commands should use data so...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22514 Yes this is a performance regression for users who run CTAS on Hive serde tables. This is a regression since Spark 2.3.0 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22144: [SPARK-24935][SQL] : Problem with Executing Hive UDF's f...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22144 This is not a PR that is ready to merge. We are likely talking about delaying 2.4.0 for multiple weeks because of this issue. Is it really worth? I'm not sure what's the exact policy, let's ping more people. @rxin @srowen @vanzin @felixcheung @gatorsmile --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22512: [SPARK-25498][SQL] InterpretedMutableProjection s...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22512#discussion_r227433044 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InternalRow.scala --- @@ -143,4 +144,24 @@ object InternalRow { case u: UserDefinedType[_] => getAccessor(u.sqlType) case _ => (input, ordinal) => input.get(ordinal, dataType) } + + /** + * Returns a writer for an `InternalRow` with given data type. + */ + def getWriter(ordinal: Int, dt: DataType): (InternalRow, Any) => Unit = dt match { +case BooleanType => (input, v) => input.setBoolean(ordinal, v.asInstanceOf[Boolean]) +case ByteType => (input, v) => input.setByte(ordinal, v.asInstanceOf[Byte]) +case ShortType => (input, v) => input.setShort(ordinal, v.asInstanceOf[Short]) +case IntegerType | DateType => (input, v) => input.setInt(ordinal, v.asInstanceOf[Int]) +case LongType | TimestampType => (input, v) => input.setLong(ordinal, v.asInstanceOf[Long]) +case FloatType => (input, v) => input.setFloat(ordinal, v.asInstanceOf[Float]) +case DoubleType => (input, v) => input.setDouble(ordinal, v.asInstanceOf[Double]) +case DecimalType.Fixed(precision, _) => + (input, v) => input.setDecimal(ordinal, v.asInstanceOf[Decimal], precision) +case CalendarIntervalType | BinaryType | _: ArrayType | StringType | _: StructType | + _: MapType | _: ObjectType | _: UserDefinedType[_] => --- End diff -- we should recursive into UDT. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22512: [SPARK-25498][SQL] InterpretedMutableProjection s...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22512#discussion_r227432603 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InternalRow.scala --- @@ -143,4 +144,24 @@ object InternalRow { case u: UserDefinedType[_] => getAccessor(u.sqlType) case _ => (input, ordinal) => input.get(ordinal, dataType) } + + /** + * Returns a writer for an `InternalRow` with given data type. + */ + def getWriter(ordinal: Int, dt: DataType): (InternalRow, Any) => Unit = dt match { +case BooleanType => (input, v) => input.setBoolean(ordinal, v.asInstanceOf[Boolean]) +case ByteType => (input, v) => input.setByte(ordinal, v.asInstanceOf[Byte]) +case ShortType => (input, v) => input.setShort(ordinal, v.asInstanceOf[Short]) +case IntegerType | DateType => (input, v) => input.setInt(ordinal, v.asInstanceOf[Int]) +case LongType | TimestampType => (input, v) => input.setLong(ordinal, v.asInstanceOf[Long]) +case FloatType => (input, v) => input.setFloat(ordinal, v.asInstanceOf[Float]) +case DoubleType => (input, v) => input.setDouble(ordinal, v.asInstanceOf[Double]) +case DecimalType.Fixed(precision, _) => + (input, v) => input.setDecimal(ordinal, v.asInstanceOf[Decimal], precision) +case CalendarIntervalType | BinaryType | _: ArrayType | StringType | _: StructType | + _: MapType | _: ObjectType | _: UserDefinedType[_] => + (input, v) => input.update(ordinal, v) +case NullType => (input, v) => {} --- End diff -- In the codegen version `CodeGenerator.setColumn`, we don't match NullType and eventually call `row.update(null, i)`. Shall we follow and call `row.setNullAt` here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22745: [SPARK-25772][SQL] Fix java map of structs deseri...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22745#discussion_r227431215 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala --- @@ -1787,3 +1788,79 @@ case class ValidateExternalType(child: Expression, expected: DataType) ev.copy(code = code, isNull = input.isNull) } } + +object GetKeyArrayFromMap { + + /** + * Construct an instance of GetArrayFromMap case class + * extracting a key array from a Map expression. + * + * @param child a Map expression to extract a key array from + */ + def apply(child: Expression): Expression = { +GetArrayFromMap( + child, + "keyArray", + _.keyArray(), + { case MapType(kt, _, _) => kt }) + } +} + +object GetValueArrayFromMap { + + /** + * Construct an instance of GetArrayFromMap case class + * extracting a value array from a Map expression. + * + * @param child a Map expression to extract a value array from + */ + def apply(child: Expression): Expression = { +GetArrayFromMap( + child, + "valueArray", + _.valueArray(), + { case MapType(_, vt, _) => vt }) + } +} + +/** + * Extracts a key/value array from a Map expression. + * + * @param child a Map expression to extract an array from + * @param functionName name of the function that is invoked to extract an array + * @param arrayGetter function extracting `ArrayData` from `MapData` + * @param elementTypeGetter function extracting array element `DataType` from `MapType` + */ +case class GetArrayFromMap private( +child: Expression, +functionName: String, +arrayGetter: MapData => ArrayData, +elementTypeGetter: MapType => DataType) extends UnaryExpression with NonSQLExpression { + + private lazy val encodedFunctionName: String = TermName(functionName).encodedName.toString + + lazy val dataType: DataType = { +val mt: MapType = child.dataType.asInstanceOf[MapType] +ArrayType(elementTypeGetter(mt)) + } + + override def checkInputDataTypes(): TypeCheckResult = { +child.dataType match { + case MapType(_, _, _) => --- End diff -- shall we just use if-else? ``` if (isinstanceOf[MapType]) ... else ... ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22745: [SPARK-25772][SQL] Fix java map of structs deseri...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22745#discussion_r227430882 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala --- @@ -1787,3 +1788,79 @@ case class ValidateExternalType(child: Expression, expected: DataType) ev.copy(code = code, isNull = input.isNull) } } + +object GetKeyArrayFromMap { + + /** + * Construct an instance of GetArrayFromMap case class + * extracting a key array from a Map expression. + * + * @param child a Map expression to extract a key array from + */ + def apply(child: Expression): Expression = { +GetArrayFromMap( + child, + "keyArray", + _.keyArray(), + { case MapType(kt, _, _) => kt }) + } +} + +object GetValueArrayFromMap { + + /** + * Construct an instance of GetArrayFromMap case class + * extracting a value array from a Map expression. + * + * @param child a Map expression to extract a value array from + */ + def apply(child: Expression): Expression = { +GetArrayFromMap( + child, + "valueArray", + _.valueArray(), + { case MapType(_, vt, _) => vt }) + } +} + +/** + * Extracts a key/value array from a Map expression. + * + * @param child a Map expression to extract an array from + * @param functionName name of the function that is invoked to extract an array + * @param arrayGetter function extracting `ArrayData` from `MapData` + * @param elementTypeGetter function extracting array element `DataType` from `MapType` + */ +case class GetArrayFromMap private( +child: Expression, +functionName: String, +arrayGetter: MapData => ArrayData, +elementTypeGetter: MapType => DataType) extends UnaryExpression with NonSQLExpression { + + private lazy val encodedFunctionName: String = TermName(functionName).encodedName.toString + + lazy val dataType: DataType = { +val mt: MapType = child.dataType.asInstanceOf[MapType] +ArrayType(elementTypeGetter(mt)) + } + + override def checkInputDataTypes(): TypeCheckResult = { +child.dataType match { + case MapType(_, _, _) => --- End diff -- `case _: MapType` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22745: [SPARK-25772][SQL] Fix java map of structs deseri...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22745#discussion_r227407344 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala --- @@ -1787,3 +1787,72 @@ case class ValidateExternalType(child: Expression, expected: DataType) ev.copy(code = code, isNull = input.isNull) } } + +object GetKeyArrayFromMap { + + /** + * Construct an instance of GetArrayFromMap case class + * extracting a key array from a Map expression. + * + * @param child a Map expression to extract key array from + */ + def apply(child: Expression): Expression = { +GetArrayFromMap( + child, + "keyArray", + { case MapType(kt, _, _) => kt }, + _.keyArray()) + } +} + +object GetValueArrayFromMap { + + /** + * Construct an instance of GetArrayFromMap case class + * extracting a value array from a Map expression. + * + * @param child a Map expression to extract value array from + */ + def apply(child: Expression): Expression = { +GetArrayFromMap( + child, + "valueArray", + { case MapType(_, vt, _) => vt }, + _.valueArray()) + } +} + +/** + * Extracts a key/value array from a Map expression. + * + * @param child a Map expression to extract array from + * @param source source of array elements, can be `Key` or `Value` + */ +case class GetArrayFromMap private( +child: Expression, +functionName: String, +elementTypeGetter: MapType => DataType, +arrayGetter: MapData => ArrayData) extends UnaryExpression with NonSQLExpression { + + private lazy val encodedFunctionName: String = TermName(functionName).encodedName.toString --- End diff -- Then let's save it. Otherwise other reviewers may get confused as well. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22785: [SPARK-25791][SQL] Datatype of serializers in RowEncoder...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22785 thanks, merging to master! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22144: [SPARK-24935][SQL] : Problem with Executing Hive UDF's f...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22144 Note that the `supportsPartial` flag was dropped at Spark 2.2, not 2.4. I'm not very familiar with Hive code so I don't clearly know how it is broken. The worst case is, Hive has some UDAF that don't support partial aggregate, and Spark needs to adjust its aggregate framework. Or it's just we incorrectly adapt Hive UDAF to Spark aggregation function, and we can simply work around it. I shouldn't state is as a feature, it's an ability of Spark's aggregate framework to stop partial aggregate for some functions. This fix is not ready. We should at least update the doc of `HiveUDAFFunction`, so that we can know where we misunderstand Hive UDAF framework. If we were at Spark 2.2, we should definitely revert the PRs that caused this issue. But it's 2.4 now, reverting very old commits is not safe. Personally I don't think this is a blocker that we have to fix it before releasing. It's not a correctness issue. it doesn't impact a lot of users, and it's there for nearly 2 years. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22745: [SPARK-25772][SQL] Fix java map of structs deserializati...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22745 LGTM except 2 minor comments --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22745: [SPARK-25772][SQL] Fix java map of structs deseri...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22745#discussion_r227391572 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala --- @@ -1787,3 +1787,72 @@ case class ValidateExternalType(child: Expression, expected: DataType) ev.copy(code = code, isNull = input.isNull) } } + +object GetKeyArrayFromMap { + + /** + * Construct an instance of GetArrayFromMap case class + * extracting a key array from a Map expression. + * + * @param child a Map expression to extract key array from + */ + def apply(child: Expression): Expression = { +GetArrayFromMap( + child, + "keyArray", + { case MapType(kt, _, _) => kt }, + _.keyArray()) + } +} + +object GetValueArrayFromMap { + + /** + * Construct an instance of GetArrayFromMap case class + * extracting a value array from a Map expression. + * + * @param child a Map expression to extract value array from + */ + def apply(child: Expression): Expression = { +GetArrayFromMap( + child, + "valueArray", + { case MapType(_, vt, _) => vt }, + _.valueArray()) + } +} + +/** + * Extracts a key/value array from a Map expression. + * + * @param child a Map expression to extract array from + * @param source source of array elements, can be `Key` or `Value` + */ +case class GetArrayFromMap private( +child: Expression, +functionName: String, +elementTypeGetter: MapType => DataType, +arrayGetter: MapData => ArrayData) extends UnaryExpression with NonSQLExpression { + + private lazy val encodedFunctionName: String = TermName(functionName).encodedName.toString --- End diff -- what is this doing? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22745: [SPARK-25772][SQL] Fix java map of structs deseri...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22745#discussion_r227391434 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala --- @@ -1787,3 +1787,72 @@ case class ValidateExternalType(child: Expression, expected: DataType) ev.copy(code = code, isNull = input.isNull) } } + +object GetKeyArrayFromMap { + + /** + * Construct an instance of GetArrayFromMap case class + * extracting a key array from a Map expression. + * + * @param child a Map expression to extract key array from + */ + def apply(child: Expression): Expression = { +GetArrayFromMap( + child, + "keyArray", + { case MapType(kt, _, _) => kt }, + _.keyArray()) + } +} + +object GetValueArrayFromMap { + + /** + * Construct an instance of GetArrayFromMap case class + * extracting a value array from a Map expression. + * + * @param child a Map expression to extract value array from + */ + def apply(child: Expression): Expression = { +GetArrayFromMap( + child, + "valueArray", + { case MapType(_, vt, _) => vt }, + _.valueArray()) + } +} + +/** + * Extracts a key/value array from a Map expression. + * + * @param child a Map expression to extract array from + * @param source source of array elements, can be `Key` or `Value` + */ +case class GetArrayFromMap private( +child: Expression, +functionName: String, +elementTypeGetter: MapType => DataType, +arrayGetter: MapData => ArrayData) extends UnaryExpression with NonSQLExpression { + + private lazy val encodedFunctionName: String = TermName(functionName).encodedName.toString + + lazy val dataType: DataType = { +child.dataType match { + case mt @ MapType(_, _, _) => +ArrayType(elementTypeGetter(mt)) + case other => --- End diff -- let's do this check in ``` override def checkInputDataTypes ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22237: [SPARK-25243][SQL] Use FailureSafeParser in from_json
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22237 @HyukjinKwon are you working on it? @gengliangwang do you want to take over? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22745: [SPARK-25772][SQL] Fix java map of structs deseri...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22745#discussion_r227357201 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala --- @@ -1787,3 +1787,75 @@ case class ValidateExternalType(child: Expression, expected: DataType) ev.copy(code = code, isNull = input.isNull) } } + +object GetArrayFromMap { + abstract class Source + case class Key() extends Source + case class Value() extends Source +} + +/** + * Extracts a key/value array from a Map expression. + * + * @param child a Map expression to extract array from + * @param source source of array elements, can be `Key` or `Value` + */ +case class GetArrayFromMap( +child: Expression, +source: GetArrayFromMap.Source) extends Expression with NonSQLExpression { + + import GetArrayFromMap._ + + private val functionName: String = source match { +case Key() => "keyArray" +case Value() => "valueArray" + } + + private lazy val encodedFunctionName = TermName(functionName).encodedName.toString + + override def nullable: Boolean = child.nullable + override def children: Seq[Expression] = child :: Nil + + lazy val dataType: DataType = { +child.dataType match { + case MapType(kt, vt, _) => +source match { + case Key() => ArrayType(kt) + case Value() => ArrayType(vt) +} + case other => +throw new RuntimeException( + s"Can't extract array from $child: need map type but got ${other.catalogString}") +} + } + + override def eval(input: InternalRow): Any = { +val mapObj = child.eval(input) +if (!mapObj.isInstanceOf[MapData]) { + throw new RuntimeException( +s"Can't extract array from $child: need map type but got ${mapObj.getClass}") +} +if (mapObj == null) { + null +} else { + val method = mapObj.getClass.getDeclaredMethod(functionName) + method.invoke(mapObj) +} + } + + override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { --- End diff -- make this expression extends `UnaryExpression`, then we can write ``` override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = defineCodeGen(ctx, ev, map => s"map.$functionName") ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22745: [SPARK-25772][SQL] Fix java map of structs deseri...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22745#discussion_r227356448 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala --- @@ -1787,3 +1787,75 @@ case class ValidateExternalType(child: Expression, expected: DataType) ev.copy(code = code, isNull = input.isNull) } } + +object GetArrayFromMap { + abstract class Source + case class Key() extends Source + case class Value() extends Source +} + +/** + * Extracts a key/value array from a Map expression. + * + * @param child a Map expression to extract array from + * @param source source of array elements, can be `Key` or `Value` + */ +case class GetArrayFromMap( +child: Expression, +source: GetArrayFromMap.Source) extends Expression with NonSQLExpression { + + import GetArrayFromMap._ + + private val functionName: String = source match { +case Key() => "keyArray" +case Value() => "valueArray" + } + + private lazy val encodedFunctionName = TermName(functionName).encodedName.toString + + override def nullable: Boolean = child.nullable + override def children: Seq[Expression] = child :: Nil + + lazy val dataType: DataType = { +child.dataType match { + case MapType(kt, vt, _) => +source match { + case Key() => ArrayType(kt) + case Value() => ArrayType(vt) +} + case other => +throw new RuntimeException( + s"Can't extract array from $child: need map type but got ${other.catalogString}") +} + } + + override def eval(input: InternalRow): Any = { +val mapObj = child.eval(input) +if (!mapObj.isInstanceOf[MapData]) { + throw new RuntimeException( +s"Can't extract array from $child: need map type but got ${mapObj.getClass}") +} +if (mapObj == null) { + null +} else { + val method = mapObj.getClass.getDeclaredMethod(functionName) --- End diff -- well this does reuse the `functionName`, but performance is more important here. how about ``` private lazy val arrayGetter: MapData => ArrayData = if (source) ... def eval... arrayGetter( input.asInstanceOf[MapData]) ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22745: [SPARK-25772][SQL] Fix java map of structs deseri...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22745#discussion_r227355778 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala --- @@ -1787,3 +1787,75 @@ case class ValidateExternalType(child: Expression, expected: DataType) ev.copy(code = code, isNull = input.isNull) } } + +object GetArrayFromMap { + abstract class Source + case class Key() extends Source + case class Value() extends Source +} + +/** + * Extracts a key/value array from a Map expression. + * + * @param child a Map expression to extract array from + * @param source source of array elements, can be `Key` or `Value` + */ +case class GetArrayFromMap( +child: Expression, +source: GetArrayFromMap.Source) extends Expression with NonSQLExpression { + + import GetArrayFromMap._ + + private val functionName: String = source match { +case Key() => "keyArray" +case Value() => "valueArray" + } + + private lazy val encodedFunctionName = TermName(functionName).encodedName.toString + + override def nullable: Boolean = child.nullable + override def children: Seq[Expression] = child :: Nil + + lazy val dataType: DataType = { +child.dataType match { + case MapType(kt, vt, _) => +source match { + case Key() => ArrayType(kt) + case Value() => ArrayType(vt) +} + case other => +throw new RuntimeException( + s"Can't extract array from $child: need map type but got ${other.catalogString}") +} + } + + override def eval(input: InternalRow): Any = { +val mapObj = child.eval(input) +if (!mapObj.isInstanceOf[MapData]) { --- End diff -- we don't need this check. `eval` is performance critical and we should assume there is no bug. We don't have this check in other expressions either. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22745: [SPARK-25772][SQL] Fix java map of structs deseri...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22745#discussion_r227355335 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala --- @@ -1787,3 +1787,75 @@ case class ValidateExternalType(child: Expression, expected: DataType) ev.copy(code = code, isNull = input.isNull) } } + +object GetArrayFromMap { + abstract class Source + case class Key() extends Source + case class Value() extends Source --- End diff -- I don't have a strong opinion, but shall we use enum? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22144: [SPARK-24935][SQL] : Problem with Executing Hive UDF's f...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22144 My feeling is that, hive compatibility is not that important to Spark at this point. *ALL* aggregate functions in Spark (including Spark UDAF) support partial aggregate, but now we need to complicate the aggregation framework and support un-partial-able aggregate functions, only for a few Hive UDAFs. Unless there is a simple workaround, or we can justify that Spark needs un-partial-able aggregate functions, IMO it's not worth to introduce this feature. BTW this PR doesn't even have a test, so I'm not sure if we can have a simple workaround for it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22778: [SPARK-25784][SQL] Infer filters from constraints...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22778#discussion_r227261253 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -171,9 +171,11 @@ abstract class Optimizer(sessionCatalog: SessionCatalog) // "Extract PythonUDF From JoinCondition". Batch("Check Cartesian Products", Once, CheckCartesianProducts) :+ -Batch("RewriteSubquery", Once, +Batch("Rewrite Subquery", Once, RewritePredicateSubquery, ColumnPruning, + InferFiltersFromConstraints, + PushDownPredicate, --- End diff -- looks good, cc @gatorsmile @maryannxue --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22788: [SPARK-25769][SQL]escape nested columns by backtick each...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22788 yea, only use json if it's a nested column. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21156: [SPARK-24087][SQL] Avoid shuffle when join keys a...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21156#discussion_r227253732 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/JoinUtils.scala --- @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.joins + +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, Distribution, HashClusteredDistribution, HashPartitioning} +import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan} +import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec + +object JoinUtils { + private def avoidShuffleIfPossible( + joinKeys: Seq[Expression], + expressions: Seq[Expression], + leftKeys: Seq[Expression], + rightKeys: Seq[Expression]): Seq[Distribution] = { +val indices = expressions.map(x => joinKeys.indexWhere(_.semanticEquals(x))) +HashClusteredDistribution(indices.map(leftKeys(_))) :: + HashClusteredDistribution(indices.map(rightKeys(_))) :: Nil + } + + def requiredChildDistributionForShuffledJoin( + partitioningDetection: Boolean, + leftKeys: Seq[Expression], + rightKeys: Seq[Expression], + left: SparkPlan, + right: SparkPlan): Seq[Distribution] = { +if (!partitioningDetection) { + return HashClusteredDistribution(leftKeys) :: HashClusteredDistribution(rightKeys) :: Nil +} + +val leftPartitioning = left.outputPartitioning +val rightPartitioning = right.outputPartitioning --- End diff -- This is my biggest concern. Currently Spark adds shuffle with a rule, so we can't always get the children partitioning precisely. We implemented a similar feature in `EnsureRequirements.reorderJoinPredicates`, which is hacky and we should improve the framework before adding more features like this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22797: [SPARK-19851][SQL] Add support for EVERY and ANY (SOME) ...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22797 yea, just a special rule instead of a general agg function rewrite framework. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22788: [SPARK-25769][SQL]escape nested columns by backtick each...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22788 Yea I think so, we can even use JSON to be safer. e.g. for `a.b.c.d`, we can encode it as a json array [a,b,c,d]. At data source side, use a json parser to read it back. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22775: [SPARK-24709][SQL][FOLLOW-UP] Make schema_of_json...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22775#discussion_r227243187 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala --- @@ -770,8 +776,17 @@ case class SchemaOfJson( factory } - override def convert(v: UTF8String): UTF8String = { -val dt = Utils.tryWithResource(CreateJacksonParser.utf8String(jsonFactory, v)) { parser => + @transient + private lazy val json = child.eval().asInstanceOf[UTF8String] --- End diff -- so `schem_of_json` must be used with `from_json`? I don't have a strong feeling to enforce it. It's also weird that users are willing to write verbose json literal in `from_json(..., schema = schema_of_json(...))` instead of DDL string. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22799: [SPARK-25805][SQL][TEST] Fix test for SPARK-25159
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22799 thanks, merging to master/2.4! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22512: [SPARK-25498][SQL] InterpretedMutableProjection s...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22512#discussion_r22723 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedMutableProjection.scala --- @@ -49,10 +51,54 @@ class InterpretedMutableProjection(expressions: Seq[Expression]) extends Mutable def currentValue: InternalRow = mutableRow override def target(row: InternalRow): MutableProjection = { +// If `mutableRow` is `UnsafeRow`, `MutableProjection` accepts fixed-length types only +assert(!row.isInstanceOf[UnsafeRow] || + validExprs.forall { case (e, _) => UnsafeRow.isFixedLength(e.dataType) }) mutableRow = row this } + private[this] val fieldWriters = validExprs.map { case (e, i) => +val writer = generateRowWriter(i, e.dataType) +if (!e.nullable) { + (v: Any) => writer(v) +} else { + (v: Any) => { +if (v == null) { + mutableRow.setNullAt(i) +} else { + writer(v) +} + } +} + } + + private def generateRowWriter(ordinal: Int, dt: DataType): Any => Unit = dt match { +case BooleanType => + v => mutableRow.setBoolean(ordinal, v.asInstanceOf[Boolean]) +case ByteType => + v => mutableRow.setByte(ordinal, v.asInstanceOf[Byte]) +case ShortType => + v => mutableRow.setShort(ordinal, v.asInstanceOf[Short]) +case IntegerType | DateType => + v => mutableRow.setInt(ordinal, v.asInstanceOf[Int]) +case LongType | TimestampType => + v => mutableRow.setLong(ordinal, v.asInstanceOf[Long]) +case FloatType => + v => mutableRow.setFloat(ordinal, v.asInstanceOf[Float]) +case DoubleType => + v => mutableRow.setDouble(ordinal, v.asInstanceOf[Double]) +case DecimalType.Fixed(precision, _) => + v => mutableRow.setDecimal(ordinal, v.asInstanceOf[Decimal], precision) +case CalendarIntervalType | BinaryType | _: ArrayType | StringType | _: StructType | + _: MapType | _: UserDefinedType[_] => + v => mutableRow.update(ordinal, v) +case NullType => + v => {} --- End diff -- the corresponding logic in the codegen version is simply call `row.update(null, i)`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org