[GitHub] spark pull request #22758: [SPARK-25332][SQL] Instead of broadcast hash join...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22758#discussion_r226151421 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala --- @@ -193,6 +193,16 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log None) val logicalRelation = cached.getOrElse { val updatedTable = inferIfNeeded(relation, options, fileFormat) + // Intialize the catalogTable stats if its not defined.An intial value has to be defined --- End diff -- I don't quite understand why table must have stats. For both file sources and hive tables, we will estimate the data size with files, if the table doesn't have stats. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22758: [SPARK-25332][SQL] Instead of broadcast hash join...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22758#discussion_r226150341 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala --- @@ -1051,7 +1051,8 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto test("test statistics of LogicalRelation converted from Hive serde tables") { Seq("orc", "parquet").foreach { format => - Seq(true, false).foreach { isConverted => + // Botth parquet and orc will have Hivestatistics, both are convertable to Logical Relation. + Seq(true, true).foreach { isConverted => --- End diff -- This is to test when the conversion is on and off. We shouldn't change it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22745: [SPARK-21402][SQL][FOLLOW-UP] Fix java map of str...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22745#discussion_r226149644 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala --- @@ -509,3 +509,24 @@ case class UnresolvedOrdinal(ordinal: Int) override def nullable: Boolean = throw new UnresolvedException(this, "nullable") override lazy val resolved = false } + +/** + * When constructing `Invoke`, the data type must be given, which may be not possible to define + * before analysis. This class acts like a placeholder for `Invoke`, and will be replaced by + * `Invoke` during analysis after the input data is resolved. Data type passed to `Invoke`` + * will be defined by applying `dataTypeFunction` to the data type of the input data. + */ +case class UnresolvedInvoke( --- End diff -- I feel this is too general. Maybe we should just create a new expression `GetArrayFromMap` and resolve it to `Invoke` later. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22309: [SPARK-20384][SQL] Support value class in schema of Data...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22309 what's still missing to support top level value class? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22309: [SPARK-20384][SQL] Support value class in schema ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22309#discussion_r226149168 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala --- @@ -632,13 +667,17 @@ object ScalaReflection extends ScalaReflection { "cannot be used as field name\n" + walkedTypePath.mkString("\n")) } + // as a field, value class is represented by its underlying type + val trueFieldType = +if (isValueClass(fieldType)) getUnderlyingTypeOf(fieldType) else fieldType --- End diff -- shall we update `dataTypeFor` to handle value class, instead of special handling it here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22309: [SPARK-20384][SQL] Support value class in schema ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22309#discussion_r226148985 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala --- @@ -376,6 +386,23 @@ object ScalaReflection extends ScalaReflection { dataType = ObjectType(udt.getClass)) Invoke(obj, "deserialize", ObjectType(udt.userClass), getPath :: Nil) + case t if isValueClass(t) => +// nested value class is treated as its underlying type +// top level value class must be treated as a product +val underlyingType = getUnderlyingTypeOf(t) +val underlyingClsName = getClassNameFromType(underlyingType) +val clsName = getUnerasedClassNameFromType(t) +val newTypePath = s"""- Scala value class: $clsName($underlyingClsName)""" +: --- End diff -- do you have an example of this message? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22309: [SPARK-20384][SQL] Support value class in schema ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22309#discussion_r226148892 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala --- @@ -376,6 +386,23 @@ object ScalaReflection extends ScalaReflection { dataType = ObjectType(udt.getClass)) Invoke(obj, "deserialize", ObjectType(udt.userClass), getPath :: Nil) + case t if isValueClass(t) => +// nested value class is treated as its underlying type +// top level value class must be treated as a product --- End diff -- This is not true, top level value class can be a single primitive type column --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22732: [SPARK-25044][FOLLOW-UP] Change ScalaUDF construc...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22732#discussion_r226143221 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala --- @@ -393,4 +393,30 @@ class UDFSuite extends QueryTest with SharedSQLContext { checkAnswer(df, Seq(Row("12"), Row("24"), Row("3null"), Row(null))) } } + + test("SPARK-25044 Verify null input handling for primitive types - with udf()") { +val udf1 = udf({(x: Long, y: Any) => x * 2 + (if (y == null) 1 else 0)}) +val df = spark.range(0, 3).toDF("a") + .withColumn("b", udf1($"a", lit(null))) + .withColumn("c", udf1(lit(null), $"a")) + +checkAnswer( + df, + Seq( +Row(0, 1, null), +Row(1, 3, null), +Row(2, 5, null))) + } + + test("SPARK-25044 Verify null input handling for primitive types - with udf.register") { +withTable("t") { --- End diff -- we can use temp view ``` withTempView("v") { Seq((null, 1, "x"), ("N", null, "y"), ("N", 3, null)).toDF("a", "b", "c").createTempView("v") ... } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22732: [SPARK-25044][FOLLOW-UP] Change ScalaUDF construc...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22732#discussion_r226142651 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/expressions/UserDefinedFunction.scala --- @@ -73,19 +73,21 @@ case class UserDefinedFunction protected[sql] ( */ @scala.annotation.varargs def apply(exprs: Column*): Column = { -if (inputTypes.isDefined && nullableTypes.isDefined) { - require(inputTypes.get.length == nullableTypes.get.length) +val numOfArgs = ScalaReflection.getParameterCount(f) --- End diff -- since we are here, shall we also check `exprs.length == numOfArgs`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22732: [SPARK-25044][FOLLOW-UP] Change ScalaUDF construc...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22732#discussion_r226141983 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala --- @@ -39,29 +42,29 @@ import org.apache.spark.sql.types.DataType * @param nullable True if the UDF can return null value. * @param udfDeterministic True if the UDF is deterministic. Deterministic UDF returns same result * each time it is invoked with a particular input. - * @param nullableTypes which of the inputTypes are nullable (i.e. not primitive) */ case class ScalaUDF( function: AnyRef, dataType: DataType, children: Seq[Expression], +inputsNullSafe: Seq[Boolean], inputTypes: Seq[DataType] = Nil, udfName: Option[String] = None, nullable: Boolean = true, -udfDeterministic: Boolean = true, -nullableTypes: Seq[Boolean] = Nil) +udfDeterministic: Boolean = true) extends Expression with ImplicitCastInputTypes with NonSQLExpression with UserDefinedExpression { // The constructor for SPARK 2.1 and 2.2 def this( function: AnyRef, dataType: DataType, children: Seq[Expression], + inputsNullSafe: Seq[Boolean], --- End diff -- this constructor is here for backward compatibility. If we have to change it, I don't think we need to keep it anymore. cc @gatorsmile --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22756: [SPARK-25758][ML] Deprecate computeCost on Bisect...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22756#discussion_r226141315 --- Diff: mllib/src/main/scala/org/apache/spark/ml/clustering/BisectingKMeans.scala --- @@ -125,8 +125,13 @@ class BisectingKMeansModel private[ml] ( /** * Computes the sum of squared distances between the input points and their corresponding cluster * centers. + * + * @deprecated This method is deprecated and will be removed in 3.0.0. Use ClusteringEvaluator + * instead. You can also get the cost on the training dataset in the summary. */ @Since("2.0.0") + @deprecated("This method is deprecated and will be removed in 3.0.0. Use ClusteringEvaluator " + --- End diff -- It looks reasonable to me to deprecate it in 2.4 so that we can remove it in 3.0, if this is the last one. Then we can have a consistent ML API in 3.0 after removing these deprecated APIs. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22721: [SPARK-25403][SQL] Refreshes the table after inserting t...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22721 what's the impact to end users? wrong statistics? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Support par...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/20999 It seems like Hive can drop partitions directly with a partition expression: https://github.com/apache/hive/blob/master/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClientPreCatalog.java#L1016 can you double check this part? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22732: [SPARK-25044][FOLLOW-UP] Change ScalaUDF construc...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22732#discussion_r225952267 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala --- @@ -39,29 +40,29 @@ import org.apache.spark.sql.types.DataType * @param nullable True if the UDF can return null value. * @param udfDeterministic True if the UDF is deterministic. Deterministic UDF returns same result * each time it is invoked with a particular input. - * @param nullableTypes which of the inputTypes are nullable (i.e. not primitive) */ case class ScalaUDF( function: AnyRef, dataType: DataType, children: Seq[Expression], +handleNullForInputs: Seq[Boolean], inputTypes: Seq[DataType] = Nil, udfName: Option[String] = None, nullable: Boolean = true, -udfDeterministic: Boolean = true, -nullableTypes: Seq[Boolean] = Nil) +udfDeterministic: Boolean = true) extends Expression with ImplicitCastInputTypes with NonSQLExpression with UserDefinedExpression { // The constructor for SPARK 2.1 and 2.2 def this( function: AnyRef, dataType: DataType, children: Seq[Expression], + handleNullForInputs: Seq[Boolean], --- End diff -- I think we should just remove this constructor. It's weird to keep backward compatibility for a private class, and I don't think it can work anymore. It's not OK to omit the nullable info. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22732: [SPARK-25044][FOLLOW-UP] Change ScalaUDF constructor sig...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22732 After more thoughts, there is one case we don't handle well. For UDFs like `(a: Any, i: Int) => xxx`, previously we can still get the int type info at runtime, and add null check for it. Now we can't get type info at compile time, because we do something like `Try(ScalaReflection.schemaFor[A1] :: ScalaReflection.schemaFor[A2] :: Nil).toOption`. When we fail to get type info of `Any`, we just give up and don't try the next input. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22708: [SPARK-21402][SQL] Fix java array of structs deserializa...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22708 thanks, merging to master/2.4! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22674: [SPARK-25680][SQL] SQL execution listener shouldn't happ...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22674 since there is no objection, I'm merging it to master, thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22750: [SPARK-25747][SQL] remove ColumnarBatchScan.needsUnsafeR...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22750 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22732: [SPARK-25044][FOLLOW-UP] Change ScalaUDF constructor sig...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22732 There is an argument about whether #22259 introduced behavior changes. Here is my analysis. Before #22259 , the type and null check was done as 1. user registers UDFs, like `(a: Int) => xxx`, `(b: Any) => xxx` 2. at compile time, get the type info, and set input types, so that the analyzer can add cast or fail the query if the real input data type doesn't match. Note that, UDFs like `(b: Any) => xxx` has no type info and we won't do type check. 3. at runtime, use reflection to get type info again, and add null check if an input is primitive type. After #22259 , the type and null check is done as 1. user registers UDFs, like `(a: Int) => xxx`, `(b: Any) => xxx` 2. at compile time, get the type info, set input types and input nullable, so that the analyzer can add cast or fail the query if the real input data type doesn't match, and add null check if necessary. Note that, UDFs like `(b: Any) => xxx` has no type info and we won't do type and null check. So we may have a behavior change if users register UDFs in a weird way. e.g. they define `(a: Int) => xxx`, but cast it to `Any => xx` during registration. Then we can't get the real type info at compile time. I'd say this is an invalid use case, because the data type check is also lost. I think we don't need to treat it as a behavior change. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22732: [SPARK-25044][FOLLOW-UP] Change ScalaUDF construc...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22732#discussion_r225807388 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala --- @@ -31,6 +31,9 @@ import org.apache.spark.sql.types.DataType * null. Use boxed type or [[Option]] if you wanna do the null-handling yourself. * @param dataType Return type of function. * @param children The input expressions of this UDF. + * @param handleNullForInputs Whether the inputs need null-value handling, which preserves the null --- End diff -- `inputsNullSafe` SGTM, let's also rename `UserDefinedFunction.nullableTypes` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22732: [SPARK-25044][FOLLOW-UP] Change ScalaUDF construc...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22732#discussion_r225804995 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/expressions/UserDefinedFunction.scala --- @@ -81,11 +81,11 @@ case class UserDefinedFunction protected[sql] ( f, dataType, exprs.map(_.expr), + nullableTypes.map(_.map(!_)).getOrElse(exprs.map(_ => false)), --- End diff -- If we can't get the type information at compile time(e.g. `(a: Any) => xxx`), I don't think `ScalaReflection.getParameterTypes` can get something useful at runtime. For this case we won't add null check anyway, so I think there is no behavior change. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21322: [SPARK-24225][CORE] Support closing AutoClosable objects...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/21322 I'd like to recommend http://apache-spark-developers-list.1001551.n3.nabble.com/SPIP-SPARK-25728-Structured-Intermediate-Representation-Tungsten-IR-for-generating-Java-code-td25370.html --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22746: [SPARK-24499][SQL][DOC] Split the page of sql-programmin...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22746 This is very cool! thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22746: [SPARK-24499][SQL][DOC] Split the page of sql-pro...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22746#discussion_r225797461 --- Diff: docs/_data/menu-sql.yaml --- @@ -0,0 +1,79 @@ +- text: Getting Started + url: sql-getting-started.html + subitems: +- text: "Starting Point: SparkSession" + url: sql-getting-started.html#starting-point-sparksession +- text: Creating DataFrames + url: sql-getting-started.html#creating-dataframes +- text: Untyped Dataset Operations --- End diff -- how about `Untyped Dataset Operations (DataFrame operations)` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22724: [SPARK-25734][SQL] Literal should have a value correspon...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22724 thanks, merging to master! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22708: [SPARK-21402][SQL] Fix java array of structs dese...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22708#discussion_r225767103 --- Diff: sql/core/src/test/java/test/org/apache/spark/sql/JavaBeanWithArraySuite.java --- @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package test.org.apache.spark.sql; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoder; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.test.TestSparkSession; +import org.apache.spark.sql.types.ArrayType; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; + +public class JavaBeanWithArraySuite { + +private static final List RECORDS = new ArrayList<>(); + +static { +RECORDS.add(new Record(1, +Arrays.asList(new Interval(111, 211), new Interval(121, 221)), +Arrays.asList(11, 21, 31, 41) +)); +RECORDS.add(new Record(2, +Arrays.asList(new Interval(112, 212), new Interval(122, 222)), +Arrays.asList(12, 22, 32, 42) +)); +RECORDS.add(new Record(3, +Arrays.asList(new Interval(113, 213), new Interval(123, 223)), +Arrays.asList(13, 23, 33, 43) +)); +} + +private TestSparkSession spark; + +@Before +public void setUp() { +spark = new TestSparkSession(); +} + +@After +public void tearDown() { +spark.stop(); +spark = null; +} + +@Test +public void testBeanWithArrayFieldsDeserialization() { + +StructType schema = createSchema(); +Encoder encoder = Encoders.bean(Record.class); + +Dataset dataset = spark +.read() +.format("json") +.schema(schema) +.load("src/test/resources/test-data/with-array-fields") +.as(encoder); + +List records = dataset.collectAsList(); + +Assert.assertTrue(Util.equals(records, RECORDS)); +} + +private static StructType createSchema() { +StructField[] intervalFields = { +new StructField("startTime", DataTypes.LongType, true, Metadata.empty()), +new StructField("endTime", DataTypes.LongType, true, Metadata.empty()) +}; +DataType intervalType = new StructType(intervalFields); + +DataType intervalsType = new ArrayType(intervalType, true); + +DataType valuesType = new ArrayType(DataTypes.IntegerType, true); + +StructField[] fields = { +new StructField("id", DataTypes.IntegerType, true, Metadata.empty()), +new StructField("intervals", intervalsType, true, Metadata.empty()), +new StructField("values", valuesType, true, Metadata.empty()) +}; +return new StructType(fields); +} + +public static class Record { + +private int id; +private List intervals; +private List values; + +public Record() { } + +Record(int id, List intervals, List values) { +this.id = id; +this.intervals = intervals; +this.values = values; +} + +public int
[GitHub] spark issue #22745: [SPARK-21402][SQL][FOLLOW-UP] Fix java map of structs de...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22745 It's a different issue, I think it worth a new ticket --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22732: [SPARK-25044][FOLLOW-UP] Change ScalaUDF construc...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22732#discussion_r225614430 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala --- @@ -31,6 +31,7 @@ import org.apache.spark.sql.types.DataType * null. Use boxed type or [[Option]] if you wanna do the null-handling yourself. * @param dataType Return type of function. * @param children The input expressions of this UDF. + * @param handleNullForInputs Whether the inputs need null-value handling respectively. --- End diff -- how about `acceptNullInputs`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22750: [SPARK-25747][SQL] remove ColumnarBatchScan.needsUnsafeR...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22750 cc @gatorsmile @hvanhovell --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22750: [SPARK-25747][SQL] remove ColumnarBatchScan.needs...
GitHub user cloud-fan opened a pull request: https://github.com/apache/spark/pull/22750 [SPARK-25747][SQL] remove ColumnarBatchScan.needsUnsafeRowConversion ## What changes were proposed in this pull request? `needsUnsafeRowConversion` is used in 2 places: 1. `ColumnarBatchScan.produceRows` 2. `FileSourceScanExec.doExecute` When we go to `ColumnarBatchScan.produceRows`, it means whole stage codegen is on but the vectorized reader is off. The vectorized reader can be off for several reasons: 1. the file format doesn't have a vectorized reader(json, csv, etc.) 2. the vectorized reader config is off 3. the schema is not supported Anyway when the vectorized reader is off, file format reader will always return unsafe rows, so `ColumnarBatchScan.needsUnsafeRowConversion` is not needed. When we go to `FileSourceScanExec.doExecute`, it means whole stage codegen is off. For this case, we need the `needsUnsafeRowConversion` to convert `ColumnarRow` to `UnsafeRow`, if the file format reader returns batch. This PR removes `ColumnarBatchScan.needsUnsafeRowConversion`, and keep this flag only in `FileSourceScanExec` ## How was this patch tested? existing tests You can merge this pull request into a Git repository by running: $ git pull https://github.com/cloud-fan/spark minor Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22750.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22750 commit 27e6b974192596baa86ac8b38b28c56e65e3c184 Author: Wenchen Fan Date: 2018-10-16T15:55:06Z remove ColumnarBatchScan.needsUnsafeRowConversion --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21322: [SPARK-24225][CORE] Support closing AutoClosable objects...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/21322 I think a design doc is better, to make sure we are on the same page before the actual coding, which saves time. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22708: [SPARK-21402][SQL] Fix java array of structs deserializa...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22708 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22708: [SPARK-21402][SQL] Fix java array of structs deserializa...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22708 cc @dongjoon-hyun here is another instance of the FileBasedDataSourceSuite flaky test. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21322: [SPARK-24225][CORE] Support closing AutoClosable objects...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/21322 Ah i see. It's kind of a broadcast but has a much small scope and its lifecycle is bound to a stage. I'm looking forward to a full design of it, thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22724: [SPARK-25734][SQL] Literal should have a value correspon...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22724 yea +1 on 3.0 only, this is kind of a developer API, advanced users may use it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22468: [SPARK-25374][SQL] SafeProjection supports fallba...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22468#discussion_r225522164 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala --- @@ -31,48 +30,7 @@ import org.apache.spark.sql.types._ @BeanInfo private[sql] case class MyLabeledPoint( @BeanProperty label: Double, - @BeanProperty features: UDT.MyDenseVector) - -// Wrapped in an object to check Scala compatibility. See SPARK-13929 -object UDT { --- End diff -- why do we change it? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22468: [SPARK-25374][SQL] SafeProjection supports fallba...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22468#discussion_r225521786 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedSafeProjection.scala --- @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.expressions + +import org.apache.spark.SparkException +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.aggregate.NoOp +import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, GenericArrayData, MapData} +import org.apache.spark.sql.types._ + + +/** + * An interpreted version of a safe projection. + * + * @param expressions that produces the resulting fields. These expressions must be bound + *to a schema. + */ +class InterpretedSafeProjection(expressions: Seq[Expression]) extends Projection { + + private[this] val mutableRow = new SpecificInternalRow(expressions.map(_.dataType)) + + private[this] val exprsWithWriters = expressions.zipWithIndex.filter { +case (NoOp, _) => false +case _ => true + }.map { case (e, i) => +val converter = generateSafeValueConverter(e.dataType) +val writer = generateRowWriter(i, e.dataType) +val f = if (!e.nullable) { + (v: Any) => writer(converter(v)) +} else { + (v: Any) => { +if (v == null) { + mutableRow.setNullAt(i) +} else { + writer(converter(v)) +} + } +} +(e, f) + } + + private def isPrimitive(dataType: DataType): Boolean = dataType match { +case BooleanType => true +case ByteType => true +case ShortType => true +case IntegerType => true +case LongType => true +case FloatType => true +case DoubleType => true +case _ => false + } + + private def generateSafeValueConverter(dt: DataType): Any => Any = dt match { +case ArrayType(elemType, _) => + if (isPrimitive(elemType)) { +v => { + val arrayValue = v.asInstanceOf[ArrayData] + new GenericArrayData(arrayValue.toArray[Any](elemType)) +} + } else { +val elementConverter = generateSafeValueConverter(elemType) +v => { + val arrayValue = v.asInstanceOf[ArrayData] + val result = new Array[Any](arrayValue.numElements()) + arrayValue.foreach(elemType, (i, e) => { +result(i) = elementConverter(e) + }) + new GenericArrayData(result) +} + } + +case st: StructType => + val fieldTypes = st.fields.map(_.dataType) + val fieldConverters = fieldTypes.map(generateSafeValueConverter) + v => { +val row = v.asInstanceOf[InternalRow] +val ar = new Array[Any](row.numFields) +var idx = 0 +while (idx < row.numFields) { + ar(idx) = fieldConverters(idx)(row.get(idx, fieldTypes(idx))) + idx += 1 +} +new GenericInternalRow(ar) + } + +case MapType(keyType, valueType, _) => + lazy val keyConverter = generateSafeValueConverter(keyType) + lazy val valueConverter = generateSafeValueConverter(valueType) + v => { +val mapValue = v.asInstanceOf[MapData] +val keys = mapValue.keyArray().toArray[Any](keyType) +val values = mapValue.valueArray().toArray[Any](valueType) +val convertedKeys = + if (isPrimitive(keyType)) keys else keys.map(keyConverter) +val convertedValues = + if (isPrimitive(valueType)) values else values.map(valueConverter) + +ArrayBasedMapData(convertedKeys, convertedValues) +
[GitHub] spark pull request #22468: [SPARK-25374][SQL] SafeProjection supports fallba...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22468#discussion_r225521397 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedSafeProjection.scala --- @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.expressions + +import org.apache.spark.SparkException +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.aggregate.NoOp +import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, GenericArrayData, MapData} +import org.apache.spark.sql.types._ + + +/** + * An interpreted version of a safe projection. + * + * @param expressions that produces the resulting fields. These expressions must be bound + *to a schema. + */ +class InterpretedSafeProjection(expressions: Seq[Expression]) extends Projection { + + private[this] val mutableRow = new SpecificInternalRow(expressions.map(_.dataType)) + + private[this] val exprsWithWriters = expressions.zipWithIndex.filter { +case (NoOp, _) => false +case _ => true + }.map { case (e, i) => +val converter = generateSafeValueConverter(e.dataType) +val writer = generateRowWriter(i, e.dataType) +val f = if (!e.nullable) { + (v: Any) => writer(converter(v)) +} else { + (v: Any) => { +if (v == null) { + mutableRow.setNullAt(i) +} else { + writer(converter(v)) +} + } +} +(e, f) + } + + private def isPrimitive(dataType: DataType): Boolean = dataType match { +case BooleanType => true +case ByteType => true +case ShortType => true +case IntegerType => true +case LongType => true +case FloatType => true +case DoubleType => true +case _ => false + } + + private def generateSafeValueConverter(dt: DataType): Any => Any = dt match { +case ArrayType(elemType, _) => + if (isPrimitive(elemType)) { +v => { + val arrayValue = v.asInstanceOf[ArrayData] + new GenericArrayData(arrayValue.toArray[Any](elemType)) +} + } else { +val elementConverter = generateSafeValueConverter(elemType) +v => { + val arrayValue = v.asInstanceOf[ArrayData] + val result = new Array[Any](arrayValue.numElements()) + arrayValue.foreach(elemType, (i, e) => { +result(i) = elementConverter(e) + }) + new GenericArrayData(result) +} + } + +case st: StructType => + val fieldTypes = st.fields.map(_.dataType) + val fieldConverters = fieldTypes.map(generateSafeValueConverter) + v => { +val row = v.asInstanceOf[InternalRow] +val ar = new Array[Any](row.numFields) +var idx = 0 +while (idx < row.numFields) { + ar(idx) = fieldConverters(idx)(row.get(idx, fieldTypes(idx))) + idx += 1 +} +new GenericInternalRow(ar) + } + +case MapType(keyType, valueType, _) => + lazy val keyConverter = generateSafeValueConverter(keyType) + lazy val valueConverter = generateSafeValueConverter(valueType) + v => { +val mapValue = v.asInstanceOf[MapData] +val keys = mapValue.keyArray().toArray[Any](keyType) +val values = mapValue.valueArray().toArray[Any](valueType) +val convertedKeys = + if (isPrimitive(keyType)) keys else keys.map(keyConverter) --- End diff -- ditto --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22468: [SPARK-25374][SQL] SafeProjection supports fallba...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22468#discussion_r225521206 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedSafeProjection.scala --- @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.expressions + +import org.apache.spark.SparkException +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.aggregate.NoOp +import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, GenericArrayData, MapData} +import org.apache.spark.sql.types._ + + +/** + * An interpreted version of a safe projection. + * + * @param expressions that produces the resulting fields. These expressions must be bound + *to a schema. + */ +class InterpretedSafeProjection(expressions: Seq[Expression]) extends Projection { + + private[this] val mutableRow = new SpecificInternalRow(expressions.map(_.dataType)) + + private[this] val exprsWithWriters = expressions.zipWithIndex.filter { +case (NoOp, _) => false +case _ => true + }.map { case (e, i) => +val converter = generateSafeValueConverter(e.dataType) +val writer = generateRowWriter(i, e.dataType) +val f = if (!e.nullable) { + (v: Any) => writer(converter(v)) +} else { + (v: Any) => { +if (v == null) { + mutableRow.setNullAt(i) +} else { + writer(converter(v)) +} + } +} +(e, f) + } + + private def isPrimitive(dataType: DataType): Boolean = dataType match { +case BooleanType => true +case ByteType => true +case ShortType => true +case IntegerType => true +case LongType => true +case FloatType => true +case DoubleType => true +case _ => false + } + + private def generateSafeValueConverter(dt: DataType): Any => Any = dt match { +case ArrayType(elemType, _) => + if (isPrimitive(elemType)) { --- End diff -- Let's not add this optimization at the beginning. We can add it later with a benchmark. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22468: [SPARK-25374][SQL] SafeProjection supports fallba...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22468#discussion_r225520812 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedSafeProjection.scala --- @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.expressions + +import org.apache.spark.SparkException +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.aggregate.NoOp +import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, GenericArrayData, MapData} +import org.apache.spark.sql.types._ + + +/** + * An interpreted version of a safe projection. + * + * @param expressions that produces the resulting fields. These expressions must be bound + *to a schema. + */ +class InterpretedSafeProjection(expressions: Seq[Expression]) extends Projection { + + private[this] val mutableRow = new SpecificInternalRow(expressions.map(_.dataType)) + + private[this] val exprsWithWriters = expressions.zipWithIndex.filter { +case (NoOp, _) => false +case _ => true + }.map { case (e, i) => +val converter = generateSafeValueConverter(e.dataType) +val writer = generateRowWriter(i, e.dataType) +val f = if (!e.nullable) { + (v: Any) => writer(converter(v)) +} else { + (v: Any) => { +if (v == null) { + mutableRow.setNullAt(i) +} else { + writer(converter(v)) +} + } +} +(e, f) + } + + private def isPrimitive(dataType: DataType): Boolean = dataType match { --- End diff -- `CodeGenerator.isPrimitiveType` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22468: [SPARK-25374][SQL] SafeProjection supports fallba...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22468#discussion_r225520290 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedSafeProjection.scala --- @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.expressions + +import org.apache.spark.SparkException +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.aggregate.NoOp +import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, GenericArrayData, MapData} +import org.apache.spark.sql.types._ + + +/** + * An interpreted version of a safe projection. + * + * @param expressions that produces the resulting fields. These expressions must be bound + *to a schema. + */ +class InterpretedSafeProjection(expressions: Seq[Expression]) extends Projection { + + private[this] val mutableRow = new SpecificInternalRow(expressions.map(_.dataType)) + + private[this] val exprsWithWriters = expressions.zipWithIndex.filter { +case (NoOp, _) => false --- End diff -- does `SafeProjection` need to handle `NoOp`? It's only used with `MutableProjection` in aggregate. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22724: [SPARK-25734][SQL] Literal should have a value correspon...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22724 `Literal` is an internal API, and AFAIK end-users can't construct an invalid `Literal` with public APIs. If they can, then it's a bug and we have a problem... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22708: [SPARK-21402][SQL] Fix java array of structs deserializa...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22708 lgtm --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22708: [SPARK-21402][SQL] Fix java array of structs deserializa...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22708 ok to test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22728: [SPARK-25736][SQL][TEST] add tests to verify the behavio...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22728 FYI, I tried both hive and presto, neither of them supports multi-column count. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22597: [SPARK-25579][SQL] Use quoted attribute names if needed ...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22597 LGTM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22724: [SPARK-25734][SQL] Literal should have a value correspon...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22724 LGTM pending jenkins --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22724: [SPARK-25734][SQL] Literal should have a value co...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22724#discussion_r225396907 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala --- @@ -196,6 +197,48 @@ object Literal { case other => throw new RuntimeException(s"no default for type $dataType") } + + private[expressions] def validateLiteralValue(value: Any, dataType: DataType): Unit = { +def doValidate(v: Any, dataType: DataType): Boolean = dataType match { + case BooleanType => v.isInstanceOf[Boolean] + case ByteType => v.isInstanceOf[Byte] + case ShortType => v.isInstanceOf[Short] + case IntegerType | DateType => v.isInstanceOf[Int] + case LongType | TimestampType => v.isInstanceOf[Long] + case FloatType => v.isInstanceOf[Float] + case DoubleType => v.isInstanceOf[Double] + case _: DecimalType => v.isInstanceOf[Decimal] + case CalendarIntervalType => v.isInstanceOf[CalendarInterval] + case BinaryType => v.isInstanceOf[Array[Byte]] + case StringType => v.isInstanceOf[UTF8String] + case st: StructType => +v.isInstanceOf[InternalRow] && { + val row = v.asInstanceOf[InternalRow] + st.fields.map(_.dataType).zipWithIndex.forall { +case (dt, i) => doValidate(row.get(i, dt), dt) + } +} + case at: ArrayType => +v.isInstanceOf[GenericArrayData] && { + val ar = v.asInstanceOf[GenericArrayData].array + ar.isEmpty || doValidate(ar.head, at.elementType) +} + case mt: MapType => +v.isInstanceOf[ArrayBasedMapData] && { + val map = v.asInstanceOf[ArrayBasedMapData] + map.numElements() == 0 || { --- End diff -- yup --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22379: [SPARK-25393][SQL] Adding new function from_csv()
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22379 LGTM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22724: [SPARK-25734][SQL] Literal should have a value co...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22724#discussion_r225392951 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala --- @@ -196,6 +197,48 @@ object Literal { case other => throw new RuntimeException(s"no default for type $dataType") } + + private[expressions] def validateLiteralValue(value: Any, dataType: DataType): Unit = { +def doValidate(v: Any, dataType: DataType): Boolean = dataType match { + case BooleanType => v.isInstanceOf[Boolean] + case ByteType => v.isInstanceOf[Byte] + case ShortType => v.isInstanceOf[Short] + case IntegerType | DateType => v.isInstanceOf[Int] + case LongType | TimestampType => v.isInstanceOf[Long] + case FloatType => v.isInstanceOf[Float] + case DoubleType => v.isInstanceOf[Double] + case _: DecimalType => v.isInstanceOf[Decimal] + case CalendarIntervalType => v.isInstanceOf[CalendarInterval] + case BinaryType => v.isInstanceOf[Array[Byte]] + case StringType => v.isInstanceOf[UTF8String] + case st: StructType => +v.isInstanceOf[InternalRow] && { + val row = v.asInstanceOf[InternalRow] + st.fields.map(_.dataType).zipWithIndex.forall { +case (dt, i) => doValidate(row.get(i, dt), dt) + } +} + case at: ArrayType => +v.isInstanceOf[GenericArrayData] && { + val ar = v.asInstanceOf[GenericArrayData].array + ar.isEmpty || doValidate(ar.head, at.elementType) +} + case mt: MapType => +v.isInstanceOf[ArrayBasedMapData] && { + val map = v.asInstanceOf[ArrayBasedMapData] + map.numElements() == 0 || { --- End diff -- we don't need this. The array validation already consider numElements --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22724: [SPARK-25734][SQL] Literal should have a value co...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22724#discussion_r225392843 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala --- @@ -196,6 +197,48 @@ object Literal { case other => throw new RuntimeException(s"no default for type $dataType") } + + private[expressions] def validateLiteralValue(value: Any, dataType: DataType): Unit = { +def doValidate(v: Any, dataType: DataType): Boolean = dataType match { + case BooleanType => v.isInstanceOf[Boolean] + case ByteType => v.isInstanceOf[Byte] + case ShortType => v.isInstanceOf[Short] + case IntegerType | DateType => v.isInstanceOf[Int] + case LongType | TimestampType => v.isInstanceOf[Long] + case FloatType => v.isInstanceOf[Float] + case DoubleType => v.isInstanceOf[Double] + case _: DecimalType => v.isInstanceOf[Decimal] + case CalendarIntervalType => v.isInstanceOf[CalendarInterval] + case BinaryType => v.isInstanceOf[Array[Byte]] + case StringType => v.isInstanceOf[UTF8String] + case st: StructType => +v.isInstanceOf[InternalRow] && { --- End diff -- can we do the same for array and map? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22724: [SPARK-25734][SQL] Literal should have a value co...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22724#discussion_r225392872 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala --- @@ -196,6 +197,48 @@ object Literal { case other => throw new RuntimeException(s"no default for type $dataType") } + + private[expressions] def validateLiteralValue(value: Any, dataType: DataType): Unit = { +def doValidate(v: Any, dataType: DataType): Boolean = dataType match { + case BooleanType => v.isInstanceOf[Boolean] + case ByteType => v.isInstanceOf[Byte] + case ShortType => v.isInstanceOf[Short] + case IntegerType | DateType => v.isInstanceOf[Int] + case LongType | TimestampType => v.isInstanceOf[Long] + case FloatType => v.isInstanceOf[Float] + case DoubleType => v.isInstanceOf[Double] + case _: DecimalType => v.isInstanceOf[Decimal] + case CalendarIntervalType => v.isInstanceOf[CalendarInterval] + case BinaryType => v.isInstanceOf[Array[Byte]] + case StringType => v.isInstanceOf[UTF8String] + case st: StructType => +v.isInstanceOf[InternalRow] && { + val row = v.asInstanceOf[InternalRow] + st.fields.map(_.dataType).zipWithIndex.forall { +case (dt, i) => doValidate(row.get(i, dt), dt) + } +} + case at: ArrayType => +v.isInstanceOf[GenericArrayData] && { + val ar = v.asInstanceOf[GenericArrayData].array + ar.isEmpty || doValidate(ar.head, at.elementType) --- End diff -- I think we need to validate all the elements --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22597: [SPARK-25579][SQL] Use quoted attribute names if ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22597#discussion_r225373279 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala --- @@ -383,4 +385,17 @@ class OrcFilterSuite extends OrcTest with SharedSQLContext { )).get.toString } } + + test("SPARK-25579 ORC PPD should support column names with dot") { +import testImplicits._ + +withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> "true") { + withTempDir { dir => +val path = new File(dir, "orc").getCanonicalPath +Seq((1, 2), (3, 4)).toDF("col.dot.1", "col.dot.2").write.orc(path) +val df = spark.read.orc(path).where("`col.dot.1` = 1 and `col.dot.2` = 2") +checkAnswer(stripSparkFilter(df), Row(1, 2)) --- End diff -- ORC data source doesn't support nested column pruning yet. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22597: [SPARK-25579][SQL] Use quoted attribute names if ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22597#discussion_r225373223 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala --- @@ -383,4 +385,17 @@ class OrcFilterSuite extends OrcTest with SharedSQLContext { )).get.toString } } + + test("SPARK-25579 ORC PPD should support column names with dot") { +import testImplicits._ + +withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> "true") { + withTempDir { dir => +val path = new File(dir, "orc").getCanonicalPath +Seq((1, 2), (3, 4)).toDF("col.dot.1", "col.dot.2").write.orc(path) --- End diff -- Do not rely on implicit environment values, let's make the test as explicit as possible. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22719: [SPARK-25714] [BACKPORT-2.2] Fix Null Handling in the Op...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22719 thanks, merging to 2.2! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22718: [SPARK-25714] [BACKPORT-2.3] Fix Null Handling in the Op...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22718 thanks, merging to 2.3! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22724: [SPARK-25734][SQL] Literal should have a value co...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22724#discussion_r225368754 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala --- @@ -196,6 +197,48 @@ object Literal { case other => throw new RuntimeException(s"no default for type $dataType") } + + private[expressions] def validateLiteralValue(value: Any, dataType: DataType): Unit = { +def doValidate(v: Any, dataType: DataType): Boolean = dataType match { + case BooleanType => v.isInstanceOf[Boolean] + case ByteType => v.isInstanceOf[Byte] + case ShortType => v.isInstanceOf[Short] + case IntegerType | DateType => v.isInstanceOf[Int] + case LongType | TimestampType => v.isInstanceOf[Long] + case FloatType => v.isInstanceOf[Float] + case DoubleType => v.isInstanceOf[Double] + case _: DecimalType => v.isInstanceOf[Decimal] + case CalendarIntervalType => v.isInstanceOf[CalendarInterval] + case BinaryType => v.isInstanceOf[Array[Byte]] + case StringType => v.isInstanceOf[UTF8String] + case st: StructType => +v.isInstanceOf[GenericInternalRow] && { --- End diff -- this is definitely not true. can we do ``` v.isInstanceOf[InternalRow] && { val row = v.asInstanceOf[InternalRow] st.fields.map(_.dataType).zipWithIndex.foreach { case (dt, i) => doValidate(row.get(i, dt), dt) } } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22731: [SPARK-25674][FOLLOW-UP] Update the stats for each Colum...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22731 LGTM, merging to master! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22733: [SPARK-25738][SQL] Fix LOAD DATA INPATH for hdfs port
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22733 LGTM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22728: [SPARK-25736][SQL][TEST] add tests to verify the behavio...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22728 BTW MySQL doesn't support `count(a, b)` but supports `count(distinct a, b)`, the result is same as Spark. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22379#discussion_r225218562 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala --- @@ -777,7 +777,6 @@ case class SchemaOfJson( } object JsonExprUtils { - def evalSchemaExpr(exp: Expression): DataType = exp match { --- End diff -- makes sense, thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22724: [SPARK-25734][SQL] Literal should have a value co...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22724#discussion_r225218350 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala --- @@ -196,6 +197,31 @@ object Literal { case other => throw new RuntimeException(s"no default for type $dataType") } + + private[expressions] def validateLiteralValue(v: Any, dataType: DataType): Unit = { +def doValidate(v: Any, dataType: DataType): Boolean = dataType match { + case BooleanType => v.isInstanceOf[Boolean] + case ByteType => v.isInstanceOf[Byte] + case ShortType => v.isInstanceOf[Short] + case IntegerType | DateType => v.isInstanceOf[Int] + case LongType | TimestampType => v.isInstanceOf[Long] + case FloatType => v.isInstanceOf[Float] + case DoubleType => v.isInstanceOf[Double] + case _: DecimalType => v.isInstanceOf[Decimal] + case CalendarIntervalType => v.isInstanceOf[CalendarInterval] + case BinaryType => v.isInstanceOf[Array[Byte]] + case StringType => v.isInstanceOf[UTF8String] + case _: StructType => v.isInstanceOf[InternalRow] + case _: ArrayType => v.isInstanceOf[ArrayData] + case _: MapType => v.isInstanceOf[MapData] --- End diff -- ah good point! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22728: [SPARK-25736][SQL][TEST] add tests to verify the behavio...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22728 cc @gatorsmile @mgaido91 @viirya --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22728: [SPARK-25736][SQL][TEST] add tests to verify the ...
GitHub user cloud-fan opened a pull request: https://github.com/apache/spark/pull/22728 [SPARK-25736][SQL][TEST] add tests to verify the behavior of multi-column count ## What changes were proposed in this pull request? AFAIK multi-column count is not widely supported by the mainstream databases(postgres doesn't support), and the SQL standard doesn't define it clearly, as near as I can tell. Since Spark supports it, we should clearly document the current behavior and add tests to verify it. ## How was this patch tested? N/A You can merge this pull request into a Git repository by running: $ git pull https://github.com/cloud-fan/spark doc Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22728.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22728 commit 62b4b84f135c2f71ecc8192deabec3d694b6bbc9 Author: Wenchen Fan Date: 2018-10-15T15:25:14Z add tests to verify the behavior of count for corner cases --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22379: [SPARK-25393][SQL] Adding new function from_csv()
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22379 have you addressed https://github.com/apache/spark/pull/22379/files#r225033808 ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22379#discussion_r225183977 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala --- @@ -777,7 +777,6 @@ case class SchemaOfJson( } object JsonExprUtils { - def evalSchemaExpr(exp: Expression): DataType = exp match { --- End diff -- do we still need it? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22708: [SPARK-21402] Fix java array/map of structs deser...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22708#discussion_r225182746 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala --- @@ -271,32 +272,41 @@ object JavaTypeInference { case c if listType.isAssignableFrom(typeToken) => val et = elementType(typeToken) -MapObjects( +UnresolvedMapObjects( --- End diff -- yes please, thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22029: [SPARK-24395][SQL] IN operator should return NULL when c...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22029 I don't have a strong opinion. cc @gatorsmile @hvanhovell @juliuszsompolski --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22029: [SPARK-24395][SQL] IN operator should return NULL when c...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22029 So, the goal here is to make the behavior consistent between multi-column IN-subquery and multi-column normal IN for Spark. That said, I feel it's reasonable to change the behavior of `(a, b) in (struct_col1, struct_col2, ...)` to return null if a field is null, but it seems pretty weird to also apply this behavior to `input_struct_col in (struct_col1, struct_col2, ...)`. It's OK to treat the `(...)` syntax specially, but it's strange to treat struct type different. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22724: [SPARK-25734][SQL] Literal should have a value correspon...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22724 LGTM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22029: [SPARK-24395][SQL] IN operator should return NULL when c...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22029 > Oracle does the same. So what's the behavior? return false? > Hive behaves like Spark now (before and after the PR) for this case. Again what's the behavior? return false? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Support par...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/20999 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22715: [SPARK-25727][SQL] Add outputOrdering to otherCopyArgs i...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22715 Hi @mgaido91 , since you are the major author of this part, do you have time to open a PR and move `outputOrdering` to the main constructor? Thanks in advance! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22713: [SPARK-25691][SQL] Use semantic equality in Alias...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22713#discussion_r225119359 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/view.scala --- @@ -112,8 +112,8 @@ object EliminateView extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { // The child should have the same output attributes with the View operator, so we simply // remove the View operator. -case View(_, output, child) => - assert(output == child.output, +case v @ View(_, output, child) => + assert(v.sameOutput(child), --- End diff -- +1 for adding a test case. BTW does it impact end-users? If it does we need to backport it to 2.4. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22708: [SPARK-21402] Fix java array/map of structs deser...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22708#discussion_r225118613 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala --- @@ -271,32 +272,41 @@ object JavaTypeInference { case c if listType.isAssignableFrom(typeToken) => val et = elementType(typeToken) -MapObjects( +UnresolvedMapObjects( --- End diff -- can we exclude other changes except this one? This one is very easy to reason about. We did the same thing in `ScalaReflection.` We need more time to think about the map case, and fix it in `ScalaReflection` as well. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22561: [SPARK-25548][SQL]In the PruneFileSourcePartitions optim...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22561 Your rewrite is applied to the entire predicate expression tree, how about the case `NOT(p AND x)`? If `p` is true, `x` is false, then `NOT(p AND x)` is true. But `NOT(true AND true)` is false. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20999#discussion_r225108214 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala --- @@ -523,35 +523,114 @@ case class AlterTableRenamePartitionCommand( */ case class AlterTableDropPartitionCommand( tableName: TableIdentifier, -specs: Seq[TablePartitionSpec], +partitionsFilters: Seq[Seq[Expression]], ifExists: Boolean, purge: Boolean, retainData: Boolean) extends RunnableCommand { override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog +val timeZone = Option(sparkSession.sessionState.conf.sessionLocalTimeZone) val table = catalog.getTableMetadata(tableName) +val partitionColumns = table.partitionColumnNames +val partitionAttributes = table.partitionSchema.toAttributes.map(a => a.name -> a).toMap DDLUtils.verifyAlterTableType(catalog, table, isView = false) DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE DROP PARTITION") -val normalizedSpecs = specs.map { spec => - PartitioningUtils.normalizePartitionSpec( -spec, -table.partitionColumnNames, -table.identifier.quotedString, -sparkSession.sessionState.conf.resolver) +val resolvedSpecs = partitionsFilters.flatMap { filtersSpec => + if (hasComplexFilters(filtersSpec)) { +generatePartitionSpec(filtersSpec, + partitionColumns, + partitionAttributes, + table.identifier, + catalog, + sparkSession.sessionState.conf.resolver, + timeZone, + ifExists) + } else { +val partitionSpec = filtersSpec.map { + case EqualTo(key: Attribute, Literal(value, StringType)) => +key.name -> value.toString +}.toMap +PartitioningUtils.normalizePartitionSpec( + partitionSpec, + partitionColumns, + table.identifier.quotedString, + sparkSession.sessionState.conf.resolver) :: Nil + } } catalog.dropPartitions( --- End diff -- So the implementation here is similar to how hive implements it? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22029: [SPARK-24395][SQL] IN operator should return NULL when c...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22029 Do you know how other databases behave for `(a, b) in (struct_col1, struct_col2, ...)` and `input_struct_col in (struct_col1, struct_col2, ...)`? Since `(a, b)` may be treated specially, we need to look at the behaviors carefully. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22713: [SPARK-25691][SQL] Use semantic equality in Alias...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22713#discussion_r225103000 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAliasAndProjectSuite.scala --- @@ -124,4 +124,11 @@ class RemoveRedundantAliasAndProjectSuite extends PlanTest with PredicateHelper val expected = Subquery(relation.select('a as "a", 'b).where('b < 10).select('a).analyze) comparePlans(optimized, expected) } + + test("SPARK-25691: RemoveRedundantProject works also with different cases") { +val relation = LocalRelation('a.int, 'b.int) +val query = relation.select('A, 'b).analyzeCaseInsensitive +val optimized = Optimize.execute(query) +comparePlans(optimized, relation) --- End diff -- Spark can be case-sensitive or not w.r.t. the config, but Spark should always be case-preserving. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22514: [SPARK-25271][SQL] Hive ctas commands should use ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22514#discussion_r225079174 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala --- @@ -34,11 +34,16 @@ import org.apache.spark.sql.types._ * @param tableDesc the metadata of the table to be created. * @param mode the data writing mode * @param query an optional logical plan representing data to write into the created table. + * @param useExternalSerde whether to use external serde to write data, e.g., Hive Serde. Currently --- End diff -- This is too hacky. We should not leak hive specific knowledge to general logical plans. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20999#discussion_r225073862 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala --- @@ -382,6 +382,30 @@ case class OuterReference(e: NamedExpression) override def newInstance(): NamedExpression = OuterReference(e.newInstance()) } +/** + * A place holder used to hold the name of the partition attributes specified when running commands + * involving partitions, eg. ALTER TABLE ... DROP PARTITIONS. + */ +case class PartitioningAttribute(name: String) + extends Attribute with Unevaluable { + override val exprId: ExprId = NamedExpression.newExprId --- End diff -- even it's a fake attribute, we should not change the `exprId` when this expression gets copied. Can we move `exprId` to the constructor? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20999#discussion_r225074108 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala --- @@ -382,6 +382,30 @@ case class OuterReference(e: NamedExpression) override def newInstance(): NamedExpression = OuterReference(e.newInstance()) } +/** + * A place holder used to hold the name of the partition attributes specified when running commands + * involving partitions, eg. ALTER TABLE ... DROP PARTITIONS. + */ +case class PartitioningAttribute(name: String) + extends Attribute with Unevaluable { + override val exprId: ExprId = NamedExpression.newExprId + // Not really needed and used. We just need a dataType to be used during analysis for resolving + // the expressions. The String type is used because all the literals in PARTITION operations are + // parsed as strings and eventually casted later. + override def dataType: DataType = StringType --- End diff -- If it's not needed, can we throw exception here? We may need to override `toString` though. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20999#discussion_r225076055 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala --- @@ -523,35 +523,114 @@ case class AlterTableRenamePartitionCommand( */ case class AlterTableDropPartitionCommand( tableName: TableIdentifier, -specs: Seq[TablePartitionSpec], +partitionsFilters: Seq[Seq[Expression]], ifExists: Boolean, purge: Boolean, retainData: Boolean) extends RunnableCommand { override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog +val timeZone = Option(sparkSession.sessionState.conf.sessionLocalTimeZone) val table = catalog.getTableMetadata(tableName) +val partitionColumns = table.partitionColumnNames +val partitionAttributes = table.partitionSchema.toAttributes.map(a => a.name -> a).toMap DDLUtils.verifyAlterTableType(catalog, table, isView = false) DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE DROP PARTITION") -val normalizedSpecs = specs.map { spec => - PartitioningUtils.normalizePartitionSpec( -spec, -table.partitionColumnNames, -table.identifier.quotedString, -sparkSession.sessionState.conf.resolver) +val resolvedSpecs = partitionsFilters.flatMap { filtersSpec => + if (hasComplexFilters(filtersSpec)) { +generatePartitionSpec(filtersSpec, + partitionColumns, + partitionAttributes, + table.identifier, + catalog, + sparkSession.sessionState.conf.resolver, + timeZone, + ifExists) + } else { +val partitionSpec = filtersSpec.map { + case EqualTo(key: Attribute, Literal(value, StringType)) => +key.name -> value.toString +}.toMap +PartitioningUtils.normalizePartitionSpec( + partitionSpec, + partitionColumns, + table.identifier.quotedString, + sparkSession.sessionState.conf.resolver) :: Nil + } } catalog.dropPartitions( --- End diff -- does hive have an API to drop partitions with a predicate? I think the current approach is very inefficient with non-equal partition predicates. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20999#discussion_r225074416 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala --- @@ -293,6 +293,31 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging } } + /** + * Create a partition specification map with filters. + */ + override def visitDropPartitionSpec( + ctx: DropPartitionSpecContext): Seq[Expression] = { --- End diff -- nit: can we move `withOrigin(ctx)` here? i.e. ``` def xxx(): T = withOrigin(ctx) { ... } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22263: [SPARK-25269][SQL] SQL interface support specify Storage...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22263 LGTM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21322: [SPARK-24225][CORE] Support closing AutoClosable objects...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/21322 Introducing a new concept which is similar to broadcast seems like an overkill. We can just update broadcast, to allow it to be memory-only. However, there might be simpler solutions to fit your case, without broadcast. e.g. ``` val myObj = ... rdd.mapPartitions { it => try { // process data } finally { myObj.close() } } ``` I think we need to clear define the use case and think about whether we need a new API or not. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22561: [SPARK-25548][SQL]In the PruneFileSourcePartitions optim...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22561 Please describe the idea clearly in the PR description. After reading the code, I think the idea is, for any predicate `p`, we can rewrite it to `p'` that only contains partition columns. The requirement is, partitions returned by the partition predicate `p`, must also be returned by `p'`. We need to be careful here, to not introduce correctness issues. Can you prove that your rewrite is safe? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22379: [SPARK-25393][SQL] Adding new function from_csv()
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22379 I wouldn't make schema a `Columm`. `Column` means it's dynamic, while I think it should be a static value, to simplify the implementation. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22379: [SPARK-25393][SQL] Adding new function from_csv()
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22379 BTW how would the `schema_of_csv` function help with `from_csv`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22379: [SPARK-25393][SQL] Adding new function from_csv()
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22379 Looks pretty good! just one minor commet --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22379#discussion_r225033899 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala --- @@ -254,7 +256,7 @@ object TextInputCSVDataSource extends CSVDataSource { val header = makeSafeHeader(firstRow, caseSensitive, parsedOptions) val sampled: Dataset[String] = CSVUtils.sample(csv, parsedOptions) val tokenRDD = sampled.rdd.mapPartitions { iter => - val filteredLines = CSVUtils.filterCommentAndEmpty(iter, parsedOptions) + val filteredLines = filterCommentAndEmpty(iter, parsedOptions) --- End diff -- +1 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22379#discussion_r225033808 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala --- @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.csv._ +import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback +import org.apache.spark.sql.catalyst.util._ +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String + +/** + * Converts a CSV input string to a [[StructType]] with the specified schema. + */ +// scalastyle:off line.size.limit +@ExpressionDescription( + usage = "_FUNC_(csvStr, schema[, options]) - Returns a struct value with the given `csvStr` and `schema`.", + examples = """ +Examples: + > SELECT _FUNC_('1, 0.8', 'a INT, b DOUBLE'); + {"a":1, "b":0.8} + > SELECT _FUNC_('26/08/2015', 'time Timestamp', map('timestampFormat', 'dd/MM/')) + {"time":2015-08-26 00:00:00.0} + """, + since = "3.0.0") +// scalastyle:on line.size.limit +case class CsvToStructs( +schema: StructType, +options: Map[String, String], +child: Expression, +timeZoneId: Option[String] = None) + extends UnaryExpression +with TimeZoneAwareExpression +with CodegenFallback +with ExpectsInputTypes +with NullIntolerant { + + override def nullable: Boolean = child.nullable + + // The CSV input data might be missing certain fields. We force the nullability + // of the user-provided schema to avoid data corruptions. + val nullableSchema: StructType = schema.asNullable + + // Used in `FunctionRegistry` + def this(child: Expression, schema: Expression, options: Map[String, String]) = +this( + schema = ExprUtils.evalSchemaExpr(schema), + options = options, + child = child, + timeZoneId = None) + + def this(child: Expression, schema: Expression) = this(child, schema, Map.empty[String, String]) + + def this(child: Expression, schema: Expression, options: Expression) = +this( + schema = ExprUtils.evalSchemaExpr(schema), + options = ExprUtils.convertToMapData(options), + child = child, + timeZoneId = None) + + // This converts parsed rows to the desired output by the given schema. + @transient + lazy val converter = (rows: Iterator[InternalRow]) => { +if (rows.hasNext) { + rows.next() --- End diff -- what if `rows` have more than one row? shall we fail or shall we return null? Up to my understanding it should fail. The parser should only return one row for struct type. If it doesn't, there must be a bug. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22029: [SPARK-24395][SQL] IN operator should return NULL when c...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22029 It looks to me that this is another instance of special handling `(a, b, ..)`, like https://github.com/apache/spark/pull/21403 `(a, b) in (struct_col1, struct_col2, ...)` is different from `input_struct_col in (struct_col1, struct_col2, ...)`, right? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22708: [SPARK-21402] Fix java array/map of structs deserializat...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22708 Can you explain how this happens? Why thhe fields of structs get mixed up? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22561: [SPARK-25548][SQL]In the PruneFileSourcePartition...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22561#discussion_r225030929 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala --- @@ -39,21 +40,31 @@ private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] { _, _)) if filters.nonEmpty && fsRelation.partitionSchemaOption.isDefined => + + val sparkSession = fsRelation.sparkSession + val partitionColumns = +logicalRelation.resolve( + partitionSchema, sparkSession.sessionState.analyzer.resolver) + val partitionSet = AttributeSet(partitionColumns) // The attribute name of predicate could be different than the one in schema in case of // case insensitive, we should change them to match the one in schema, so we donot need to // worry about case sensitivity anymore. val normalizedFilters = filters.map { e => -e transform { +e transformUp { case a: AttributeReference => a.withName(logicalRelation.output.find(_.semanticEquals(a)).get.name) + // Replace the nonPartitionOps field with true in the And(partitionOps, nonPartitionOps) + // to make the partition can be pruned + case and @And(left, right) => +val leftPartition = left.references.filter(partitionSet.contains(_)) +val rightPartition = right.references.filter(partitionSet.contains(_)) +if (leftPartition.size == left.references.size && rightPartition.size == 0) { + and.withNewChildren(Seq(left, Literal(true, BooleanType))) +} else if (leftPartition.size == 0 && rightPartition.size == right.references.size) { + and.withNewChildren(Seq(Literal(true, BooleanType), right)) +} else and } } - - val sparkSession = fsRelation.sparkSession - val partitionColumns = -logicalRelation.resolve( - partitionSchema, sparkSession.sessionState.analyzer.resolver) - val partitionSet = AttributeSet(partitionColumns) val partitionKeyFilters = --- End diff -- I think a simpler change is ``` val partitionKeyFilters = ExpressionSet(normalizedFilters.map(splitConjunctivePredicates).filter...) ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22309: [SPARK-20384][SQL] Support value class in schema ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22309#discussion_r225030497 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala --- @@ -125,6 +125,17 @@ object ScalaReflection extends ScalaReflection { case _ => false } + def isValueClass(tpe: `Type`): Boolean = { +val notNull = !(tpe <:< localTypeOf[Null]) +notNull && definedByConstructorParams(tpe) && tpe <:< localTypeOf[AnyVal] --- End diff -- why a value class must be a `Product`? Is it because there is no way to get the value class field name and type? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22309: [SPARK-20384][SQL] Support value class in schema ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22309#discussion_r225030384 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala --- @@ -376,6 +387,23 @@ object ScalaReflection extends ScalaReflection { dataType = ObjectType(udt.getClass)) Invoke(obj, "deserialize", ObjectType(udt.userClass), getPath :: Nil) + case t if isValueClass(t) => +// nested value class is treated as its underlying type +// top level value class must be treated as a product +val underlyingType = getUnderlyingTypeOf(t) +val underlyingClsName = getClassNameFromType(underlyingType) +val clsName = t.typeSymbol.asClass.fullName --- End diff -- how about `getClassNameFromType(t)`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22309: [SPARK-20384][SQL] Support value class in schema ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22309#discussion_r225030352 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala --- @@ -376,6 +387,23 @@ object ScalaReflection extends ScalaReflection { dataType = ObjectType(udt.getClass)) Invoke(obj, "deserialize", ObjectType(udt.userClass), getPath :: Nil) + case t if isValueClass(t) => +// nested value class is treated as its underlying type +// top level value class must be treated as a product +val underlyingType = getUnderlyingTypeOf(t) +val underlyingClsName = getClassNameFromType(underlyingType) +val clsName = t.typeSymbol.asClass.fullName +val newTypePath = s"""- Scala value class: $clsName($underlyingClsName)""" +: + walkedTypePath + +val arg = deserializerFor(underlyingType, path, newTypePath) +if (path.isDefined) { + arg --- End diff -- why can we skip the `NewInstance`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22697: [SPARK-25700][SQL][BRANCH-2.4] Partially revert append m...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22697 thanks, merging to 2.4! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22713: [SPARK-25691][SQL] Use semantic equality in Optim...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22713#discussion_r225025827 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAliasAndProjectSuite.scala --- @@ -124,4 +124,11 @@ class RemoveRedundantAliasAndProjectSuite extends PlanTest with PredicateHelper val expected = Subquery(relation.select('a as "a", 'b).where('b < 10).select('a).analyze) comparePlans(optimized, expected) } + + test("SPARK-25691: RemoveRedundantProject works also with different cases") { +val relation = LocalRelation('a.int, 'b.int) +val query = relation.select('A, 'b).analyzeCaseInsensitive +val optimized = Optimize.execute(query) +comparePlans(optimized, relation) --- End diff -- I agree that using `==` on attributes is error-prone, but we should update then one-by-one, to narrow down the scope and make sure the change is reasonable. For instance, I don't think this is a valid case. If we optimize it, the final schema field names will change, which is a breaking change if this plan an input of a parquet writing plan. (the result parquet files will have a different schema) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org