[GitHub] spark pull request #23141: [SPARK-26021][SQL][followup] add test for special...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23141#discussion_r239333919 --- Diff: common/unsafe/src/test/java/org/apache/spark/unsafe/PlatformUtilSuite.java --- @@ -165,10 +165,14 @@ public void writeMinusZeroIsReplacedWithZero() { byte[] floatBytes = new byte[Float.BYTES]; Platform.putDouble(doubleBytes, Platform.BYTE_ARRAY_OFFSET, -0.0d); Platform.putFloat(floatBytes, Platform.BYTE_ARRAY_OFFSET, -0.0f); -double doubleFromPlatform = Platform.getDouble(doubleBytes, Platform.BYTE_ARRAY_OFFSET); -float floatFromPlatform = Platform.getFloat(floatBytes, Platform.BYTE_ARRAY_OFFSET); -Assert.assertEquals(Double.doubleToLongBits(0.0d), Double.doubleToLongBits(doubleFromPlatform)); -Assert.assertEquals(Float.floatToIntBits(0.0f), Float.floatToIntBits(floatFromPlatform)); +byte[] doubleBytes2 = new byte[Double.BYTES]; +byte[] floatBytes2 = new byte[Float.BYTES]; +Platform.putDouble(doubleBytes, Platform.BYTE_ARRAY_OFFSET, 0.0d); +Platform.putFloat(floatBytes, Platform.BYTE_ARRAY_OFFSET, 0.0f); --- End diff -- ah good catch! I'm surprised this test passed before... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23239: [SPARK-26021][SQL][followup] only deal with NaN and -0.0...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23239 Yes, the 3 cases I pointed that need to handle NaN and -0.0 do not change the value in `UnsafeRow`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23239: [SPARK-26021][SQL][followup] only deal with NaN and -0.0...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23239 cc @adoron @kiszk @viirya @gatorsmile --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23239: [SPARK-26021][SQL][followup] only deal with NaN a...
GitHub user cloud-fan opened a pull request: https://github.com/apache/spark/pull/23239 [SPARK-26021][SQL][followup] only deal with NaN and -0.0 in UnsafeWriter ## What changes were proposed in this pull request? A followup of https://github.com/apache/spark/pull/23043 There are 4 places we need to deal with NaN and -0.0: 1. Range partitioner(the sorter). `-0.0` and `0.0` should be assigned to the same partition. Different NaNs should be assigned to the same partition. 2. Join keys. `-0.0` and `0.0` should be treated as same. Different NaNs should be treated as same. 3. grouping keys. `-0.0` and `0.0` should be assigned to the same group. Different NaNs should be assigned to the same group. 4. comparison expressions. `-0.0` and `0.0` should be treated as same. Different NaNs should be treated as same. The case 4 is OK. Our comparison already handles NaN and -0.0, and for struct/array/map, we will recursively compare the fields/elements. Case 1, 2 and 3 are problematic, as they compare `UnsafeRow` binary directly, and different NaNs have different binary representation, and the same thing happens for -0.0 and 0.0. To fix it, a simple solution is: let `UnsafeProjection` always produce `UnsafeRow`s with NaN and -0.0 normalized(use the standard NaN and replace -0.0 with 0.0). The `UnsafeRow`s in case 1, 2 and 3 are all created by `UnsafeProjection`. Following this direction, this PR moves the handling of NaN and -0.0 from `Platform` to `UnsafeWriter`, so that places like `UnsafeRow.setFloat` will not handle them, which reduces the perf overhead. It's also easier to add comments explaining why we do it in `UnsafeWriter`. ## How was this patch tested? existing tests You can merge this pull request into a Git repository by running: $ git pull https://github.com/cloud-fan/spark minor Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/23239.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #23239 commit 797ade3eb175c41866efbffa3cb4c30f90e49ca7 Author: Wenchen Fan Date: 2018-12-05T15:05:39Z only deal with NaN and -0.0 in UnsafeWriter --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23235: [SPARK-26151][SQL][FOLLOWUP] Return partial results for ...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23235 thanks, merging to master! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23227: [SPARK-26271][FOLLOW-UP][SQL] remove unuse object SparkP...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23227 thanks, merging to master! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23207#discussion_r239090244 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLShuffleMetricsReporter.scala --- @@ -95,3 +96,59 @@ private[spark] object SQLShuffleMetricsReporter { FETCH_WAIT_TIME -> SQLMetrics.createTimingMetric(sc, "fetch wait time"), RECORDS_READ -> SQLMetrics.createMetric(sc, "records read")) } + +/** + * A shuffle write metrics reporter for SQL exchange operators. Different with + * [[SQLShuffleReadMetricsReporter]], we need a function of (reporter => reporter) set in + * shuffle dependency, so the local SQLMetric should transient and create on executor. + * @param metrics Shuffle write metrics in current SparkPlan. + * @param metricsReporter Other reporter need to be updated in this SQLShuffleWriteMetricsReporter. + */ +private[spark] case class SQLShuffleWriteMetricsReporter( +metrics: Map[String, SQLMetric])(metricsReporter: ShuffleWriteMetricsReporter) + extends ShuffleWriteMetricsReporter with Serializable { + @transient private[this] lazy val _bytesWritten = +metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_BYTES_WRITTEN) + @transient private[this] lazy val _recordsWritten = +metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_RECORDS_WRITTEN) + @transient private[this] lazy val _writeTime = +metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_WRITE_TIME) + + override private[spark] def incBytesWritten(v: Long): Unit = { +metricsReporter.incBytesWritten(v) +_bytesWritten.add(v) + } + override private[spark] def decRecordsWritten(v: Long): Unit = { +metricsReporter.decBytesWritten(v) +_recordsWritten.set(_recordsWritten.value - v) + } + override private[spark] def incRecordsWritten(v: Long): Unit = { +metricsReporter.incRecordsWritten(v) +_recordsWritten.add(v) + } + override private[spark] def incWriteTime(v: Long): Unit = { +metricsReporter.incWriteTime(v) +_writeTime.add(v) + } + override private[spark] def decBytesWritten(v: Long): Unit = { +metricsReporter.decBytesWritten(v) +_bytesWritten.set(_bytesWritten.value - v) + } +} + +private[spark] object SQLShuffleWriteMetricsReporter { + val SHUFFLE_BYTES_WRITTEN = "shuffleBytesWritten" + val SHUFFLE_RECORDS_WRITTEN = "shuffleRecordsWritten" + val SHUFFLE_WRITE_TIME = "shuffleWriteTime" --- End diff -- cc @rxin , do you think we should change this metric to use ms as well? In all the places that read/write it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23213: [SPARK-26262][SQL] Runs SQLQueryTestSuite on mixed confi...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23213 If we look at test coverage, `wholeStage=false, factoryMode=CODE_ONLY` will go through code paths that wholeStageCodegen doesn't cover. Or did I miss something? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23213: [SPARK-26262][SQL] Runs SQLQueryTestSuite on mixed confi...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23213 But whole stage codegen will not test `GenerateUnsafeProject`, `GenerateMutableProject`, etc., right? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23222: [SPARK-20636] Add the rule TransposeWindow to the optimi...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23222 Jenkins passes, which means the previously added end-to-end test can't not show the benefit of this rule. We should update it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23207#discussion_r239060606 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLShuffleMetricsReporter.scala --- @@ -95,3 +96,59 @@ private[spark] object SQLShuffleMetricsReporter { FETCH_WAIT_TIME -> SQLMetrics.createTimingMetric(sc, "fetch wait time"), RECORDS_READ -> SQLMetrics.createMetric(sc, "records read")) } + +/** + * A shuffle write metrics reporter for SQL exchange operators. Different with + * [[SQLShuffleReadMetricsReporter]], we need a function of (reporter => reporter) set in + * shuffle dependency, so the local SQLMetric should transient and create on executor. + * @param metrics Shuffle write metrics in current SparkPlan. + * @param metricsReporter Other reporter need to be updated in this SQLShuffleWriteMetricsReporter. + */ +private[spark] case class SQLShuffleWriteMetricsReporter( +metrics: Map[String, SQLMetric])(metricsReporter: ShuffleWriteMetricsReporter) + extends ShuffleWriteMetricsReporter with Serializable { + @transient private[this] lazy val _bytesWritten = +metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_BYTES_WRITTEN) + @transient private[this] lazy val _recordsWritten = +metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_RECORDS_WRITTEN) + @transient private[this] lazy val _writeTime = +metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_WRITE_TIME) + + override private[spark] def incBytesWritten(v: Long): Unit = { +metricsReporter.incBytesWritten(v) +_bytesWritten.add(v) + } + override private[spark] def decRecordsWritten(v: Long): Unit = { +metricsReporter.decBytesWritten(v) +_recordsWritten.set(_recordsWritten.value - v) + } + override private[spark] def incRecordsWritten(v: Long): Unit = { +metricsReporter.incRecordsWritten(v) +_recordsWritten.add(v) + } + override private[spark] def incWriteTime(v: Long): Unit = { +metricsReporter.incWriteTime(v) +_writeTime.add(v) + } + override private[spark] def decBytesWritten(v: Long): Unit = { +metricsReporter.decBytesWritten(v) +_bytesWritten.set(_bytesWritten.value - v) + } +} + +private[spark] object SQLShuffleWriteMetricsReporter { + val SHUFFLE_BYTES_WRITTEN = "shuffleBytesWritten" + val SHUFFLE_RECORDS_WRITTEN = "shuffleRecordsWritten" + val SHUFFLE_WRITE_TIME = "shuffleWriteTime" --- End diff -- do we have other time metrics using nanoseconds? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23207#discussion_r239059162 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala --- @@ -163,6 +171,8 @@ object SQLMetrics { Utils.bytesToString } else if (metricsType == TIMING_METRIC) { Utils.msDurationToString + } else if (metricsType == NS_TIMING_METRIC) { +duration => Utils.msDurationToString(duration / 1000 / 1000) --- End diff -- will this string lose the nanosecond precision? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23213: [SPARK-26262][SQL] Runs SQLQueryTestSuite on mixed confi...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23213 how about `wholeStage=false, factoryMode=CODE_ONLY`? I think it's different from `wholeStage=false, factoryMode=NO_CODEGEN`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23213: [SPARK-26262][SQL] Runs SQLQueryTestSuite on mixed confi...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23213 that's a lot of time... Can we think more about the combination of codegen and wholeStage? When we turn on whole stage codegen but turn off codegen, what will happen? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23222: [SPARK-20636] Add the rule TransposeWindow to the optimi...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23222 That PR also added an end-to-end test, does this mean that test is not valid? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17899: [SPARK-20636] Add new optimization rule to transp...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/17899#discussion_r238950241 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -734,6 +734,28 @@ object CollapseWindow extends Rule[LogicalPlan] { } } +/** + * Transpose Adjacent Window Expressions. --- End diff -- why is this rule useful? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23213: [SPARK-26262][SQL] Runs SQLQueryTestSuite on mixed confi...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23213 do you know how long `SQLQueryTestSuite` takes? We are making it longer by 4 times here, so better to know the overhead. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22514: [SPARK-25271][SQL] Hive ctas commands should use ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22514#discussion_r238949362 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala --- @@ -181,62 +180,39 @@ case class RelationConversions( conf: SQLConf, sessionCatalog: HiveSessionCatalog) extends Rule[LogicalPlan] { private def isConvertible(relation: HiveTableRelation): Boolean = { -val serde = relation.tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT) -serde.contains("parquet") && conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET) || - serde.contains("orc") && conf.getConf(HiveUtils.CONVERT_METASTORE_ORC) +isConvertible(relation.tableMeta) } - // Return true for Apache ORC and Hive ORC-related configuration names. - // Note that Spark doesn't support configurations like `hive.merge.orcfile.stripe.level`. - private def isOrcProperty(key: String) = -key.startsWith("orc.") || key.contains(".orc.") - - private def isParquetProperty(key: String) = -key.startsWith("parquet.") || key.contains(".parquet.") - - private def convert(relation: HiveTableRelation): LogicalRelation = { -val serde = relation.tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT) - -// Consider table and storage properties. For properties existing in both sides, storage -// properties will supersede table properties. -if (serde.contains("parquet")) { - val options = relation.tableMeta.properties.filterKeys(isParquetProperty) ++ -relation.tableMeta.storage.properties + (ParquetOptions.MERGE_SCHEMA -> - conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING).toString) - sessionCatalog.metastoreCatalog -.convertToLogicalRelation(relation, options, classOf[ParquetFileFormat], "parquet") -} else { - val options = relation.tableMeta.properties.filterKeys(isOrcProperty) ++ -relation.tableMeta.storage.properties - if (conf.getConf(SQLConf.ORC_IMPLEMENTATION) == "native") { -sessionCatalog.metastoreCatalog.convertToLogicalRelation( - relation, - options, - classOf[org.apache.spark.sql.execution.datasources.orc.OrcFileFormat], - "orc") - } else { -sessionCatalog.metastoreCatalog.convertToLogicalRelation( - relation, - options, - classOf[org.apache.spark.sql.hive.orc.OrcFileFormat], - "orc") - } -} + private def isConvertible(tableMeta: CatalogTable): Boolean = { +val serde = tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT) +serde.contains("parquet") && SQLConf.get.getConf(HiveUtils.CONVERT_METASTORE_PARQUET) || + serde.contains("orc") && SQLConf.get.getConf(HiveUtils.CONVERT_METASTORE_ORC) } + private val metastoreCatalog = sessionCatalog.metastoreCatalog + override def apply(plan: LogicalPlan): LogicalPlan = { plan resolveOperators { // Write path case InsertIntoTable(r: HiveTableRelation, partition, query, overwrite, ifPartitionNotExists) // Inserting into partitioned table is not supported in Parquet/Orc data source (yet). if query.resolved && DDLUtils.isHiveTable(r.tableMeta) && !r.isPartitioned && isConvertible(r) => -InsertIntoTable(convert(r), partition, query, overwrite, ifPartitionNotExists) +InsertIntoTable(metastoreCatalog.convert(r), partition, + query, overwrite, ifPartitionNotExists) // Read path case relation: HiveTableRelation if DDLUtils.isHiveTable(relation.tableMeta) && isConvertible(relation) => -convert(relation) +metastoreCatalog.convert(relation) + + // CTAS + case CreateTable(tableDesc, mode, Some(query)) + if DDLUtils.isHiveTable(tableDesc) && tableDesc.partitionColumnNames.isEmpty && +isConvertible(tableDesc) => --- End diff -- We usually don't write a migration guide for perf optimizations. Otherwise it's annoying to write one for each optimization and ask users to turn it off if something goes wrong. I think we only do that when there are known issues. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23120: [SPARK-26151][SQL] Return partial results for bad CSV re...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23120 Hi @MaxGekk , since this changes the result(although makes it better), do you mind adding a migration guide? thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23207#discussion_r238909822 --- Diff: core/src/main/scala/org/apache/spark/shuffle/metrics.scala --- @@ -50,3 +50,57 @@ private[spark] trait ShuffleWriteMetricsReporter { private[spark] def decBytesWritten(v: Long): Unit private[spark] def decRecordsWritten(v: Long): Unit } + + +/** + * A proxy class of ShuffleWriteMetricsReporter which proxy all metrics updating to the input + * reporters. + */ +private[spark] class GroupedShuffleWriteMetricsReporter( --- End diff -- For the write metrics, it's different. It's the default one calls the SQL one, which needs to hack the default one to register external reporters. Maybe we should not change the read side, just create a special `PairShuffleWriteMetricsReporter` to update both the SQL reporter and default reporter. Another idea is, `ShuffleDependency` carries a `reporter => reporter` function, instead of a reporter. Then we can create a SQL reporter which takes another reporter(similar to read side), and put the SQL reporter's constructor in `ShuffleDependency`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22514: [SPARK-25271][SQL] Hive ctas commands should use ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22514#discussion_r238908877 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala --- @@ -181,62 +180,39 @@ case class RelationConversions( conf: SQLConf, sessionCatalog: HiveSessionCatalog) extends Rule[LogicalPlan] { private def isConvertible(relation: HiveTableRelation): Boolean = { -val serde = relation.tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT) -serde.contains("parquet") && conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET) || - serde.contains("orc") && conf.getConf(HiveUtils.CONVERT_METASTORE_ORC) +isConvertible(relation.tableMeta) } - // Return true for Apache ORC and Hive ORC-related configuration names. - // Note that Spark doesn't support configurations like `hive.merge.orcfile.stripe.level`. - private def isOrcProperty(key: String) = -key.startsWith("orc.") || key.contains(".orc.") - - private def isParquetProperty(key: String) = -key.startsWith("parquet.") || key.contains(".parquet.") - - private def convert(relation: HiveTableRelation): LogicalRelation = { -val serde = relation.tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT) - -// Consider table and storage properties. For properties existing in both sides, storage -// properties will supersede table properties. -if (serde.contains("parquet")) { - val options = relation.tableMeta.properties.filterKeys(isParquetProperty) ++ -relation.tableMeta.storage.properties + (ParquetOptions.MERGE_SCHEMA -> - conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING).toString) - sessionCatalog.metastoreCatalog -.convertToLogicalRelation(relation, options, classOf[ParquetFileFormat], "parquet") -} else { - val options = relation.tableMeta.properties.filterKeys(isOrcProperty) ++ -relation.tableMeta.storage.properties - if (conf.getConf(SQLConf.ORC_IMPLEMENTATION) == "native") { -sessionCatalog.metastoreCatalog.convertToLogicalRelation( - relation, - options, - classOf[org.apache.spark.sql.execution.datasources.orc.OrcFileFormat], - "orc") - } else { -sessionCatalog.metastoreCatalog.convertToLogicalRelation( - relation, - options, - classOf[org.apache.spark.sql.hive.orc.OrcFileFormat], - "orc") - } -} + private def isConvertible(tableMeta: CatalogTable): Boolean = { +val serde = tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT) +serde.contains("parquet") && SQLConf.get.getConf(HiveUtils.CONVERT_METASTORE_PARQUET) || + serde.contains("orc") && SQLConf.get.getConf(HiveUtils.CONVERT_METASTORE_ORC) } + private val metastoreCatalog = sessionCatalog.metastoreCatalog + override def apply(plan: LogicalPlan): LogicalPlan = { plan resolveOperators { // Write path case InsertIntoTable(r: HiveTableRelation, partition, query, overwrite, ifPartitionNotExists) // Inserting into partitioned table is not supported in Parquet/Orc data source (yet). if query.resolved && DDLUtils.isHiveTable(r.tableMeta) && !r.isPartitioned && isConvertible(r) => -InsertIntoTable(convert(r), partition, query, overwrite, ifPartitionNotExists) +InsertIntoTable(metastoreCatalog.convert(r), partition, + query, overwrite, ifPartitionNotExists) // Read path case relation: HiveTableRelation if DDLUtils.isHiveTable(relation.tableMeta) && isConvertible(relation) => -convert(relation) +metastoreCatalog.convert(relation) + + // CTAS + case CreateTable(tableDesc, mode, Some(query)) + if DDLUtils.isHiveTable(tableDesc) && tableDesc.partitionColumnNames.isEmpty && +isConvertible(tableDesc) => --- End diff -- I don't mind to add `HiveUtils.CONVERT_METASTORE_ORC_CTAS`, maybe we can do it in a followup? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23210: [SPARK-26233][SQL] CheckOverflow when encoding a decimal...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23210 a late LGTM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23217: [SPARK-25829][SQL][FOLLOWUP] Refactor MapConcat in order...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23217 thanks, merging to master! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22514: [SPARK-25271][SQL] Hive ctas commands should use ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22514#discussion_r238899698 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala --- @@ -181,62 +180,39 @@ case class RelationConversions( conf: SQLConf, sessionCatalog: HiveSessionCatalog) extends Rule[LogicalPlan] { private def isConvertible(relation: HiveTableRelation): Boolean = { -val serde = relation.tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT) -serde.contains("parquet") && conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET) || - serde.contains("orc") && conf.getConf(HiveUtils.CONVERT_METASTORE_ORC) +isConvertible(relation.tableMeta) } - // Return true for Apache ORC and Hive ORC-related configuration names. - // Note that Spark doesn't support configurations like `hive.merge.orcfile.stripe.level`. - private def isOrcProperty(key: String) = -key.startsWith("orc.") || key.contains(".orc.") - - private def isParquetProperty(key: String) = -key.startsWith("parquet.") || key.contains(".parquet.") - - private def convert(relation: HiveTableRelation): LogicalRelation = { -val serde = relation.tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT) - -// Consider table and storage properties. For properties existing in both sides, storage -// properties will supersede table properties. -if (serde.contains("parquet")) { - val options = relation.tableMeta.properties.filterKeys(isParquetProperty) ++ -relation.tableMeta.storage.properties + (ParquetOptions.MERGE_SCHEMA -> - conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING).toString) - sessionCatalog.metastoreCatalog -.convertToLogicalRelation(relation, options, classOf[ParquetFileFormat], "parquet") -} else { - val options = relation.tableMeta.properties.filterKeys(isOrcProperty) ++ -relation.tableMeta.storage.properties - if (conf.getConf(SQLConf.ORC_IMPLEMENTATION) == "native") { -sessionCatalog.metastoreCatalog.convertToLogicalRelation( - relation, - options, - classOf[org.apache.spark.sql.execution.datasources.orc.OrcFileFormat], - "orc") - } else { -sessionCatalog.metastoreCatalog.convertToLogicalRelation( - relation, - options, - classOf[org.apache.spark.sql.hive.orc.OrcFileFormat], - "orc") - } -} + private def isConvertible(tableMeta: CatalogTable): Boolean = { +val serde = tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT) +serde.contains("parquet") && SQLConf.get.getConf(HiveUtils.CONVERT_METASTORE_PARQUET) || + serde.contains("orc") && SQLConf.get.getConf(HiveUtils.CONVERT_METASTORE_ORC) } + private val metastoreCatalog = sessionCatalog.metastoreCatalog + override def apply(plan: LogicalPlan): LogicalPlan = { plan resolveOperators { // Write path case InsertIntoTable(r: HiveTableRelation, partition, query, overwrite, ifPartitionNotExists) // Inserting into partitioned table is not supported in Parquet/Orc data source (yet). if query.resolved && DDLUtils.isHiveTable(r.tableMeta) && !r.isPartitioned && isConvertible(r) => -InsertIntoTable(convert(r), partition, query, overwrite, ifPartitionNotExists) +InsertIntoTable(metastoreCatalog.convert(r), partition, + query, overwrite, ifPartitionNotExists) // Read path case relation: HiveTableRelation if DDLUtils.isHiveTable(relation.tableMeta) && isConvertible(relation) => -convert(relation) +metastoreCatalog.convert(relation) + + // CTAS + case CreateTable(tableDesc, mode, Some(query)) + if DDLUtils.isHiveTable(tableDesc) && tableDesc.partitionColumnNames.isEmpty && +isConvertible(tableDesc) => --- End diff -- It's not a new optimization... It's an optimization we dropped in 2.3 by mistake. I'm fine to add a config with default value true. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22514: [SPARK-25271][SQL] Hive ctas commands should use ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22514#discussion_r238706120 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala --- @@ -95,9 +77,127 @@ case class CreateHiveTableAsSelectCommand( Seq.empty[Row] } + // Returns `DataWritingCommand` used to write data when the table exists. + def writingCommandForExistingTable( +catalog: SessionCatalog, +tableDesc: CatalogTable): DataWritingCommand + + // Returns `DataWritingCommand` used to write data when the table doesn't exist. + def writingCommandForNewTable( +catalog: SessionCatalog, +tableDesc: CatalogTable): DataWritingCommand + override def argString: String = { s"[Database:${tableDesc.database}, " + s"TableName: ${tableDesc.identifier.table}, " + s"InsertIntoHiveTable]" } } + +/** + * Create table and insert the query result into it. + * + * @param tableDesc the table description, which may contain serde, storage handler etc. + * @param query the query whose result will be insert into the new relation + * @param mode SaveMode + */ +case class CreateHiveTableAsSelectCommand( +tableDesc: CatalogTable, +query: LogicalPlan, +outputColumnNames: Seq[String], +mode: SaveMode) + extends CreateHiveTableAsSelectBase { + + override def writingCommandForExistingTable( + catalog: SessionCatalog, + tableDesc: CatalogTable): DataWritingCommand = { +InsertIntoHiveTable( + tableDesc, + Map.empty, + query, + overwrite = false, + ifPartitionNotExists = false, + outputColumnNames = outputColumnNames) + } + + override def writingCommandForNewTable( + catalog: SessionCatalog, + tableDesc: CatalogTable): DataWritingCommand = { +// For CTAS, there is no static partition values to insert. +val partition = tableDesc.partitionColumnNames.map(_ -> None).toMap +InsertIntoHiveTable( + tableDesc, + partition, + query, + overwrite = true, + ifPartitionNotExists = false, + outputColumnNames = outputColumnNames) + } +} + +/** + * Create table and insert the query result into it. This creates Hive table but inserts + * the query result into it by using data source. + * + * @param tableDesc the table description, which may contain serde, storage handler etc. + * @param query the query whose result will be insert into the new relation + * @param mode SaveMode + */ +case class OptimizedCreateHiveTableAsSelectCommand( +tableDesc: CatalogTable, +query: LogicalPlan, +outputColumnNames: Seq[String], +mode: SaveMode) + extends CreateHiveTableAsSelectBase { + + private def getHadoopRelation( + catalog: SessionCatalog, + tableDesc: CatalogTable): HadoopFsRelation = { +val metastoreCatalog = catalog.asInstanceOf[HiveSessionCatalog].metastoreCatalog +val hiveTable = DDLUtils.readHiveTable(tableDesc) + +metastoreCatalog.convert(hiveTable) match { + case LogicalRelation(t: HadoopFsRelation, _, _, _) => t + case _ => throw new AnalysisException(s"$tableIdentifier should be converted to " + +"HadoopFsRelation.") +} + } + + override def writingCommandForExistingTable( + catalog: SessionCatalog, + tableDesc: CatalogTable): DataWritingCommand = { +val hadoopRelation = getHadoopRelation(catalog, tableDesc) +InsertIntoHadoopFsRelationCommand( + hadoopRelation.location.rootPaths.head, + Map.empty, // We don't support to convert partitioned table. + false, + Seq.empty, // We don't support to convert partitioned table. + hadoopRelation.bucketSpec, + hadoopRelation.fileFormat, + hadoopRelation.options, + query, + mode, + Some(tableDesc), + Some(hadoopRelation.location), + query.output.map(_.name)) + } + + override def writingCommandForNewTable( + catalog: SessionCatalog, + tableDesc: CatalogTable): DataWritingCommand = { +val hadoopRelation = getHadoopRelation(catalog, tableDesc) +InsertIntoHadoopFsRelationCommand( + hadoopRelation.location.rootPaths.head, + Map.empty, // We don't support to convert partitioned table. + false, + Seq.empty, // We don't support to convert partitioned table. + hadoopRelation.bucketSpec, + hadoopRelation.fileFormat, + hadoopRelation.optio
[GitHub] spark pull request #23217: [SPARK-25829][SQL][FOLLOWUP] Refactor MapConcat i...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23217#discussion_r238698103 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilder.scala --- @@ -47,13 +48,17 @@ class ArrayBasedMapBuilder(keyType: DataType, valueType: DataType) extends Seria private lazy val keyGetter = InternalRow.getAccessor(keyType) private lazy val valueGetter = InternalRow.getAccessor(valueType) - def put(key: Any, value: Any): Unit = { + def put(key: Any, value: Any, withSizeCheck: Boolean = false): Unit = { if (key == null) { throw new RuntimeException("Cannot use null as map key.") } val index = keyToIndex.getOrDefault(key, -1) if (index == -1) { + if (withSizeCheck && size >= ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) { --- End diff -- hmmm, I'd like to avoid premature optimization. Actually how much perf this can save? This code block is already doing some heavy work. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23217: [SPARK-25829][SQL][FOLLOWUP] Refactor MapConcat i...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23217#discussion_r238651534 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilder.scala --- @@ -47,13 +48,17 @@ class ArrayBasedMapBuilder(keyType: DataType, valueType: DataType) extends Seria private lazy val keyGetter = InternalRow.getAccessor(keyType) private lazy val valueGetter = InternalRow.getAccessor(valueType) - def put(key: Any, value: Any): Unit = { + def put(key: Any, value: Any, withSizeCheck: Boolean = false): Unit = { if (key == null) { throw new RuntimeException("Cannot use null as map key.") } val index = keyToIndex.getOrDefault(key, -1) if (index == -1) { + if (withSizeCheck && size >= ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) { +throw new RuntimeException(s"Unsuccessful attempt to concat maps with $size elements " + --- End diff -- nit: `concat` -> `build` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23217: [SPARK-25829][SQL][FOLLOWUP] Refactor MapConcat i...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23217#discussion_r238651421 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilder.scala --- @@ -47,13 +48,17 @@ class ArrayBasedMapBuilder(keyType: DataType, valueType: DataType) extends Seria private lazy val keyGetter = InternalRow.getAccessor(keyType) private lazy val valueGetter = InternalRow.getAccessor(valueType) - def put(key: Any, value: Any): Unit = { + def put(key: Any, value: Any, withSizeCheck: Boolean = false): Unit = { if (key == null) { throw new RuntimeException("Cannot use null as map key.") } val index = keyToIndex.getOrDefault(key, -1) if (index == -1) { + if (withSizeCheck && size >= ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) { --- End diff -- I think we should aways check the size. Such a big map is very likely to cause problems. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23217: [SPARK-25829][SQL][FOLLOWUP] Refactor MapConcat in order...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23217 thanks for the cleanup! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22957: [SPARK-25951][SQL] Ignore aliases for distributio...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22957#discussion_r238650207 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala --- @@ -145,9 +145,14 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] { assert(requiredChildDistributions.length == children.length) assert(requiredChildOrderings.length == children.length) +val aliasMap = AttributeMap[Expression](children.flatMap(_.expressions.collect { + case a: Alias => (a.toAttribute, a) +})) + // Ensure that the operator's children satisfy their output distribution requirements. children = children.zip(requiredChildDistributions).map { - case (child, distribution) if child.outputPartitioning.satisfies(distribution) => + case (child, distribution) if child.outputPartitioning.satisfies( + distribution.mapExpressions(replaceAlias(_, aliasMap))) => --- End diff -- seems we are not on the same page... Let's make the example clearer. Assuming a `relation[a ,b]`'s partitioning is `hash(a, b)`, then `Project(a as c, a as d, b, relation)`'s partitioning should be `[hash(c, b), hash(d, b)]`. It's like a flatMap. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22468: [SPARK-25374][SQL] SafeProjection supports fallback to a...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22468 thanks, merging to master! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23213: [SPARK-26262][SQL] Run SQLQueryTestSuite with WHOLESTAGE...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23213 We should create such a framework when we need to have per-file config settings for testing. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22957: [SPARK-25951][SQL] Ignore aliases for distributio...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22957#discussion_r238634730 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala --- @@ -145,9 +145,14 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] { assert(requiredChildDistributions.length == children.length) assert(requiredChildOrderings.length == children.length) +val aliasMap = AttributeMap[Expression](children.flatMap(_.expressions.collect { + case a: Alias => (a.toAttribute, a) +})) + // Ensure that the operator's children satisfy their output distribution requirements. children = children.zip(requiredChildDistributions).map { - case (child, distribution) if child.outputPartitioning.satisfies(distribution) => + case (child, distribution) if child.outputPartitioning.satisfies( + distribution.mapExpressions(replaceAlias(_, aliasMap))) => --- End diff -- For `Project(a as c, a as d, b, relation)`, I think the `outputPartitioning` should be `[hash part c, hash part d, hash part b]`. The point is, we should not report an output partitioning whose attribute is not even in the current plan's output. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23207#discussion_r238633725 --- Diff: core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala --- @@ -92,6 +92,12 @@ private[spark] class ShuffleMapTask( threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime } else 0L +// Register the shuffle write metrics reporter to shuffleWriteMetrics. +if (dep.shuffleWriteMetricsReporter.isDefined) { + context.taskMetrics().shuffleWriteMetrics.registerExternalShuffleWriteReporter( --- End diff -- a simpler idea: 1. create a `class GroupedShuffleWriteMetricsReporter(reporters: Seq[ShuffleWriteMetricsReporter]) extends ShuffleWriteMetricsReporter`, which proxy all the metrics updating to the input reporters. 2. create a `GroupedShuffleWriteMetricsReporter` instance here: `new GroupedShuffleWriteMetricsReporter(Seq(dep.shuffleWriteMetricsReporter.get, context.taskMetrics().shuffleWriteMetrics))`, and pass it to `manager.getWriter` I think we can use the same approach for read metrics as well. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23207#discussion_r238630996 --- Diff: core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala --- @@ -92,6 +92,12 @@ private[spark] class ShuffleMapTask( threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime } else 0L +// Register the shuffle write metrics reporter to shuffleWriteMetrics. +if (dep.shuffleWriteMetricsReporter.isDefined) { + context.taskMetrics().shuffleWriteMetrics.registerExternalShuffleWriteReporter( --- End diff -- This happens per-task, I think `ShuffleWriteMetrics.externalReporters` can be option instead of array. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23207: [SPARK-26193][SQL] Implement shuffle write metric...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23207#discussion_r238630981 --- Diff: core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala --- @@ -92,6 +92,12 @@ private[spark] class ShuffleMapTask( threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime } else 0L +// Register the shuffle write metrics reporter to shuffleWriteMetrics. +if (dep.shuffleWriteMetricsReporter.isDefined) { + context.taskMetrics().shuffleWriteMetrics.registerExternalShuffleWriteReporter( --- End diff -- This happens per-task, I think `ShuffleWriteMetrics.externalReporters` can be option instead of array. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23215: [SPARK-26263][SQL] Throw exception when Partition column...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23215 I think this new behavior makes more sense, but we need to add a migration guide. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23213: [SPARK-26262][SQL] Run SQLQueryTestSuite with WHO...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23213#discussion_r238627633 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala --- @@ -2899,6 +2899,144 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { } } } + + private def checkKeywordsExistsInExplain(df: DataFrame, keywords: String*): Unit = { +val output = new java.io.ByteArrayOutputStream() +Console.withOut(output) { + df.explain(extended = true) +} +val normalizedOutput = output.toString.replaceAll("#\\d+", "#x") +for (key <- keywords) { + assert(normalizedOutput.contains(key)) +} + } + + test("optimized plan should show the rewritten aggregate expression") { --- End diff -- all the explain related tests. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22957: [SPARK-25951][SQL] Ignore aliases for distributio...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22957#discussion_r238626367 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala --- @@ -145,9 +145,14 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] { assert(requiredChildDistributions.length == children.length) assert(requiredChildOrderings.length == children.length) +val aliasMap = AttributeMap[Expression](children.flatMap(_.expressions.collect { + case a: Alias => (a.toAttribute, a) +})) + // Ensure that the operator's children satisfy their output distribution requirements. children = children.zip(requiredChildDistributions).map { - case (child, distribution) if child.outputPartitioning.satisfies(distribution) => + case (child, distribution) if child.outputPartitioning.satisfies( + distribution.mapExpressions(replaceAlias(_, aliasMap))) => --- End diff -- for example, `relation[a, b]`'s output partitioning is `[hash partition a, hash partition b]`, and `Project(a as c, b, relation)`'s output partitioning should be `[hash partition c, hash partition b]`. What do you mean by `But ProjectExec.outputPartitioning cannot contain a reference to the aliases`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23213: [SPARK-26262][SQL] Run SQLQueryTestSuite with WHO...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23213#discussion_r238625581 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala --- @@ -144,9 +144,10 @@ class SQLQueryTestSuite extends QueryTest with SharedSQLContext { val (comments, code) = input.split("\n").partition(_.startsWith("--")) // Runs all the tests on both codegen-only and interpreter modes -val codegenConfigSets = Array(CODEGEN_ONLY, NO_CODEGEN).map { - case codegenFactoryMode => -Array(SQLConf.CODEGEN_FACTORY_MODE.key -> codegenFactoryMode.toString) +val codegenConfigSets = Array(("false", "NO_CODEGEN"), ("true", "CODEGEN_ONLY")).map { --- End diff -- shall we test all the combinations? e.g. `wholeStage=on, codegen=off` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23213: [SPARK-26262][SQL] Run SQLQueryTestSuite with WHO...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23213#discussion_r238625268 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala --- @@ -2899,6 +2899,144 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { } } } + + private def checkKeywordsExistsInExplain(df: DataFrame, keywords: String*): Unit = { +val output = new java.io.ByteArrayOutputStream() +Console.withOut(output) { + df.explain(extended = true) +} +val normalizedOutput = output.toString.replaceAll("#\\d+", "#x") +for (key <- keywords) { + assert(normalizedOutput.contains(key)) +} + } + + test("optimized plan should show the rewritten aggregate expression") { --- End diff -- can we move them to `ExplainSuite`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23214: [SPARK-26155] Optimizing the performance of LongToUnsafe...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23214 I think there is a problem, but no one found out because it's only about metrics. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23207: [SPARK-26193][SQL] Implement shuffle write metrics in SQ...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23207 Can you share some ideas about it? IMO shuffle write metrics is hard, as an RDD can have shuffle dependencies with multiple upstream RDDs. That said, in general the shuffle write metrics should belong to the upstream RDDs. In Spark SQL, it's a little simpler, as the `ShuffledRowRDD` always have only one child, so it's reasonable to say that shuffle write metrics belong to `ShuffledRowRDD`. That said, we need to design a not-so-general shuffle write metrics API in Spark core, which will only be used in Spark SQL. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23214: [SPARK-26155] Optimizing the performance of LongToUnsafe...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23214 It's easy to track `numKeyLookups` at `HashedRelation`, but it's hard to track `numProbes`. One idea is, we pass a `MutableInt` to `LongToUnsafeRowMap.getValue` as a parameter, and in the method we set the actual `numProbes` of this look up to the `MutableInt` parameter. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23214: [SPARK-26155] Optimizing the performance of LongToUnsafe...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23214 I might know the root cause: `LongToUnsafeRowMap` is acutally accessed by multiple threads. For broadcast hash join, we will copy the broadcasted hash relation to avoid multi-thread problem, via `HashedRelation.asReadOnlyCopy`. However, this is a shallow copy, the `LongToUnsafeRowMap` is not copied and likely shared by multiple `HashedRelation`s. The metrics is per-task, so I think a better fix is to track the hash probe metrics per `HashedRelation`, instead of per `LongToUnsafeRowMap`. It's too costly to copy the `LongToUnsafeRowMap`, we should think about how to do it efficiently. cc @hvanhovell --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23214: [SPARK-26155] Optimizing the performance of LongT...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23214#discussion_r238549645 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala --- @@ -398,8 +399,8 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap private var numKeys = 0L // Tracking average number of probes per key lookup. - private var numKeyLookups = 0L - private var numProbes = 0L + private var numKeyLookups = new LongAdder + private var numProbes = new LongAdder --- End diff -- I'm surprised. I think `LongToUnsafeRowMap` is used in a single thread environment and multi-thread contend should not be an issue here. Do you have any insights about how this fixes the perf issue? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23214: [SPARK-26155] Optimizing the performance of LongToUnsafe...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23214 ok to test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23171: [SPARK-26205][SQL] Optimize In for bytes, shorts, ints
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23171 How about, we create an `OptimizedIn`, and convert `In` to `OptimizedIn` if the list is all literals? `OptimizedIn` will pick `switch` or hash set based on the length of the list. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22468: [SPARK-25374][SQL] SafeProjection supports fallba...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22468#discussion_r238534101 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverterSuite.scala --- @@ -535,4 +535,98 @@ class UnsafeRowConverterSuite extends SparkFunSuite with Matchers with PlanTestB assert(unsafeRow.getSizeInBytes == 8 + 8 * 2 + roundedSize(field1.getSizeInBytes) + roundedSize(field2.getSizeInBytes)) } + + testBothCodegenAndInterpreted("SPARK-25374 converts back into safe representation") { +def convertBackToInternalRow(inputRow: InternalRow, fields: Array[DataType]): InternalRow = { + val unsafeProj = UnsafeProjection.create(fields) + val unsafeRow = unsafeProj(inputRow) + val safeProj = SafeProjection.create(fields) + safeProj(unsafeRow) +} + +// Simple tests +val inputRow = InternalRow.fromSeq(Seq( + false, 3.toByte, 15.toShort, -83, 129L, 1.0f, 8.0, UTF8String.fromString("test"), + Decimal(255), CalendarInterval.fromString("interval 1 day"), Array[Byte](1, 2) +)) +val fields1 = Array( + BooleanType, ByteType, ShortType, IntegerType, LongType, FloatType, + DoubleType, StringType, DecimalType.defaultConcreteType, CalendarIntervalType, + BinaryType) + +assert(convertBackToInternalRow(inputRow, fields1) === inputRow) + +// Array tests +val arrayRow = InternalRow.fromSeq(Seq( + createArray(1, 2, 3), + createArray( +createArray(Seq("a", "b", "c").map(UTF8String.fromString): _*), +createArray(Seq("d").map(UTF8String.fromString): _*)) +)) +val fields2 = Array[DataType]( + ArrayType(IntegerType), + ArrayType(ArrayType(StringType))) + +assert(convertBackToInternalRow(arrayRow, fields2) === arrayRow) + +// Struct tests +val structRow = InternalRow.fromSeq(Seq( + InternalRow.fromSeq(Seq[Any](1, 4.0)), + InternalRow.fromSeq(Seq( +UTF8String.fromString("test"), +InternalRow.fromSeq(Seq( + 1, + createArray(Seq("2", "3").map(UTF8String.fromString): _*) +)) + )) +)) +val fields3 = Array[DataType]( + StructType( +StructField("c0", IntegerType) :: +StructField("c1", DoubleType) :: +Nil), + StructType( +StructField("c2", StringType) :: +StructField("c3", StructType( + StructField("c4", IntegerType) :: + StructField("c5", ArrayType(StringType)) :: + Nil)) :: +Nil)) + +assert(convertBackToInternalRow(structRow, fields3) === structRow) + +// Map tests +val mapRow = InternalRow.fromSeq(Seq( + createMap(Seq("k1", "k2").map(UTF8String.fromString): _*)(1, 2), + createMap( +createMap(3, 5)(Seq("v1", "v2").map(UTF8String.fromString): _*), +createMap(7, 9)(Seq("v3", "v4").map(UTF8String.fromString): _*) + )( +createMap(Seq("k3", "k4").map(UTF8String.fromString): _*)(3.toShort, 4.toShort), +createMap(Seq("k5", "k6").map(UTF8String.fromString): _*)(5.toShort, 6.toShort) + ))) +val fields4 = Array[DataType]( + MapType(StringType, IntegerType), + MapType(MapType(IntegerType, StringType), MapType(StringType, ShortType))) + +val mapResultRow = convertBackToInternalRow(mapRow, fields4).toSeq(fields4) +val mapExpectedRow = mapRow.toSeq(fields4) +// Since `ArrayBasedMapData` does not override `equals` and `hashCode`, --- End diff -- Or we can use `ExpressionEvalHelper.checkResult` here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22468: [SPARK-25374][SQL] SafeProjection supports fallba...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22468#discussion_r238533700 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverterSuite.scala --- @@ -535,4 +535,98 @@ class UnsafeRowConverterSuite extends SparkFunSuite with Matchers with PlanTestB assert(unsafeRow.getSizeInBytes == 8 + 8 * 2 + roundedSize(field1.getSizeInBytes) + roundedSize(field2.getSizeInBytes)) } + + testBothCodegenAndInterpreted("SPARK-25374 converts back into safe representation") { +def convertBackToInternalRow(inputRow: InternalRow, fields: Array[DataType]): InternalRow = { + val unsafeProj = UnsafeProjection.create(fields) + val unsafeRow = unsafeProj(inputRow) + val safeProj = SafeProjection.create(fields) + safeProj(unsafeRow) +} + +// Simple tests +val inputRow = InternalRow.fromSeq(Seq( + false, 3.toByte, 15.toShort, -83, 129L, 1.0f, 8.0, UTF8String.fromString("test"), + Decimal(255), CalendarInterval.fromString("interval 1 day"), Array[Byte](1, 2) +)) +val fields1 = Array( + BooleanType, ByteType, ShortType, IntegerType, LongType, FloatType, + DoubleType, StringType, DecimalType.defaultConcreteType, CalendarIntervalType, + BinaryType) + +assert(convertBackToInternalRow(inputRow, fields1) === inputRow) + +// Array tests +val arrayRow = InternalRow.fromSeq(Seq( + createArray(1, 2, 3), + createArray( +createArray(Seq("a", "b", "c").map(UTF8String.fromString): _*), +createArray(Seq("d").map(UTF8String.fromString): _*)) +)) +val fields2 = Array[DataType]( + ArrayType(IntegerType), + ArrayType(ArrayType(StringType))) + +assert(convertBackToInternalRow(arrayRow, fields2) === arrayRow) + +// Struct tests +val structRow = InternalRow.fromSeq(Seq( + InternalRow.fromSeq(Seq[Any](1, 4.0)), + InternalRow.fromSeq(Seq( +UTF8String.fromString("test"), +InternalRow.fromSeq(Seq( + 1, + createArray(Seq("2", "3").map(UTF8String.fromString): _*) +)) + )) +)) +val fields3 = Array[DataType]( + StructType( +StructField("c0", IntegerType) :: +StructField("c1", DoubleType) :: +Nil), + StructType( +StructField("c2", StringType) :: +StructField("c3", StructType( + StructField("c4", IntegerType) :: + StructField("c5", ArrayType(StringType)) :: + Nil)) :: +Nil)) + +assert(convertBackToInternalRow(structRow, fields3) === structRow) + +// Map tests +val mapRow = InternalRow.fromSeq(Seq( + createMap(Seq("k1", "k2").map(UTF8String.fromString): _*)(1, 2), + createMap( +createMap(3, 5)(Seq("v1", "v2").map(UTF8String.fromString): _*), +createMap(7, 9)(Seq("v3", "v4").map(UTF8String.fromString): _*) + )( +createMap(Seq("k3", "k4").map(UTF8String.fromString): _*)(3.toShort, 4.toShort), +createMap(Seq("k5", "k6").map(UTF8String.fromString): _*)(5.toShort, 6.toShort) + ))) +val fields4 = Array[DataType]( + MapType(StringType, IntegerType), + MapType(MapType(IntegerType, StringType), MapType(StringType, ShortType))) + +val mapResultRow = convertBackToInternalRow(mapRow, fields4).toSeq(fields4) +val mapExpectedRow = mapRow.toSeq(fields4) +// Since `ArrayBasedMapData` does not override `equals` and `hashCode`, --- End diff -- maybe we should implement `equals` and `hashCode` in `ArrayBasedMapData` and `UnsafeMapData`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23171: [SPARK-26205][SQL] Optimize In for bytes, shorts, ints
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23171 I think `InSet` is not an optimized version of `In`, but just a way to separate the implementation for different conditions (the length of the list). Maybe we should do the same thing here, create a `InSwitch` and convert `In` to it when meeting some conditions. One problem is, `In` and `InSwitch` is same in the interpreted version, maybe we should create a base class for them. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23171: [SPARK-26205][SQL] Optimize In for bytes, shorts, ints
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23171 @rxin I proposed the same thing before, but one problem is that, we only convert `In` to `InSet` when the length of list reaches the threshold. If the `switch` way is faster than hash set when the list is small, it seems still worth to optimize `In` using `switch`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23212: [SPARK-25498][SQL][FOLLOW-UP] Return an empty config set...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23212 thanks, merging to master! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23208: [SPARK-25530][SQL] data source v2 API refactor (b...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23208#discussion_r238524973 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchWrite.java --- @@ -25,14 +25,14 @@ import org.apache.spark.sql.types.StructType; /** - * A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to + * A mix-in interface for {@link Table}. Data sources can implement this interface to * provide data writing ability for batch processing. * * This interface is used to create {@link BatchWriteSupport} instances when end users run --- End diff -- I don't have a better naming in mind, so I leave it as `WriteSupport` for now. Better naming is welcome to match `Scan`! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22957: [SPARK-25951][SQL] Ignore aliases for distributio...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22957#discussion_r238524763 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala --- @@ -145,9 +145,14 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] { assert(requiredChildDistributions.length == children.length) assert(requiredChildOrderings.length == children.length) +val aliasMap = AttributeMap[Expression](children.flatMap(_.expressions.collect { + case a: Alias => (a.toAttribute, a) +})) + // Ensure that the operator's children satisfy their output distribution requirements. children = children.zip(requiredChildDistributions).map { - case (child, distribution) if child.outputPartitioning.satisfies(distribution) => + case (child, distribution) if child.outputPartitioning.satisfies( + distribution.mapExpressions(replaceAlias(_, aliasMap))) => --- End diff -- As an example, `ProjectExec.outputPartitioning` can be wrong, as it doesn't consider the aliases in the project list. I think it's clearer to adjust the `outputPartitioning` there, instead of dealing with it in a rule. What if we have more rules need to check `outputPartitioning` and `requiredChildDistribution`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22468: [SPARK-25374][SQL] SafeProjection supports fallba...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22468#discussion_r238520267 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverterSuite.scala --- @@ -535,4 +535,100 @@ class UnsafeRowConverterSuite extends SparkFunSuite with Matchers with PlanTestB assert(unsafeRow.getSizeInBytes == 8 + 8 * 2 + roundedSize(field1.getSizeInBytes) + roundedSize(field2.getSizeInBytes)) } + + testBothCodegenAndInterpreted("SPARK-25374 converts back into safe representation") { +def convertBackToInternalRow(inputRow: InternalRow, fields: Array[DataType]): InternalRow = { + val unsafeProj = UnsafeProjection.create(fields) + val unsafeRow = unsafeProj(inputRow) + val safeProj = SafeProjection.create(fields) + safeProj(unsafeRow) +} + +// Simple tests +val inputRow = InternalRow.fromSeq(Seq( + false, 3.toByte, 15.toShort, -83, 129L, 1.0f, 8.0, UTF8String.fromString("test"), + Decimal(255), CalendarInterval.fromString("interval 1 day"), Array[Byte](1, 2) +)) +val fields1 = Array( + BooleanType, ByteType, ShortType, IntegerType, LongType, FloatType, + DoubleType, StringType, DecimalType.defaultConcreteType, CalendarIntervalType, + BinaryType) + +assert(convertBackToInternalRow(inputRow, fields1) === inputRow) + +// Array tests +val arrayRow = InternalRow.fromSeq(Seq( + createArray(1, 2, 3), + createArray( +createArray(Seq("a", "b", "c").map(UTF8String.fromString): _*), +createArray(Seq("d").map(UTF8String.fromString): _*)) +)) +val fields2 = Array[DataType]( + ArrayType(IntegerType), + ArrayType(ArrayType(StringType))) + +assert(convertBackToInternalRow(arrayRow, fields2) === arrayRow) + +// Struct tests +val structRow = InternalRow.fromSeq(Seq( + InternalRow.fromSeq(Seq[Any](1, 4.0)), + InternalRow.fromSeq(Seq( +UTF8String.fromString("test"), +InternalRow.fromSeq(Seq( + 1, + createArray(Seq("2", "3").map(UTF8String.fromString): _*) +)) + )) +)) +val fields3 = Array[DataType]( + StructType( +StructField("c0", IntegerType) :: +StructField("c1", DoubleType) :: +Nil), + StructType( +StructField("c2", StringType) :: +StructField("c3", StructType( + StructField("c4", IntegerType) :: + StructField("c5", ArrayType(StringType)) :: + Nil)) :: +Nil)) + +assert(convertBackToInternalRow(structRow, fields3) === structRow) + +// Map tests +val mapRow = InternalRow.fromSeq(Seq( + createMap(Seq("k1", "k2").map(UTF8String.fromString): _*)(1, 2), + createMap( +createMap(3, 5)(Seq("v1", "v2").map(UTF8String.fromString): _*), +createMap(7, 9)(Seq("v3", "v4").map(UTF8String.fromString): _*) + )( +createMap(Seq("k3", "k4").map(UTF8String.fromString): _*)(3.toShort, 4.toShort), +createMap(Seq("k5", "k6").map(UTF8String.fromString): _*)(5.toShort, 6.toShort) + ))) +val fields4 = Array[DataType]( + MapType(StringType, IntegerType), + MapType(MapType(IntegerType, StringType), MapType(StringType, ShortType))) + +// Since `ArrayBasedMapData` does not override `equals` and `hashCode`, +// we need to take care of it to compare rows. +def toComparable(d: Any): Any = d match { --- End diff -- this does nothing, isn't it? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22468: [SPARK-25374][SQL] SafeProjection supports fallba...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22468#discussion_r238515544 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGeneratorWithInterpretedFallbackSuite.scala --- @@ -106,4 +106,19 @@ class CodeGeneratorWithInterpretedFallbackSuite extends SparkFunSuite with PlanT assert(proj(input).toSeq(StructType.fromDDL("c0 int, c1 int")) === expected) } } + + test("SPARK-25374 Correctly handles NoOp in SafeProjection") { +val exprs = Seq(Add(BoundReference(0, IntegerType, nullable = true), Literal.create(1)), NoOp) +val input = InternalRow.fromSeq(1 :: 1 :: Nil) +val expected = 2 :: null :: Nil +withSQLConf(SQLConf.CODEGEN_FACTORY_MODE.key -> codegenOnly) { --- End diff -- nvm, this is the code style in this test suite --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22468: [SPARK-25374][SQL] SafeProjection supports fallba...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22468#discussion_r238515444 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGeneratorWithInterpretedFallbackSuite.scala --- @@ -106,4 +106,19 @@ class CodeGeneratorWithInterpretedFallbackSuite extends SparkFunSuite with PlanT assert(proj(input).toSeq(StructType.fromDDL("c0 int, c1 int")) === expected) } } + + test("SPARK-25374 Correctly handles NoOp in SafeProjection") { +val exprs = Seq(Add(BoundReference(0, IntegerType, nullable = true), Literal.create(1)), NoOp) +val input = InternalRow.fromSeq(1 :: 1 :: Nil) +val expected = 2 :: null :: Nil +withSQLConf(SQLConf.CODEGEN_FACTORY_MODE.key -> codegenOnly) { --- End diff -- can we use `testWithBothCodegenAndIntepreted`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22468: [SPARK-25374][SQL] SafeProjection supports fallba...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22468#discussion_r238515227 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala --- @@ -166,29 +166,40 @@ object UnsafeProjection } } -/** - * A projection that could turn UnsafeRow into GenericInternalRow --- End diff -- can we keep this comment? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23212: [SPARK-25498][SQL][FOLLOW-UP] Return an empty config set...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23212 LGTM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22468: [SPARK-25374][SQL] SafeProjection supports fallba...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22468#discussion_r238330712 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InternalRow.scala --- @@ -157,4 +157,22 @@ object InternalRow { getValueNullSafe } } + + /** + * Returns a writer for an `InternalRow` with given data type. + */ + def getWriter(ordinal: Int, dt: DataType): (InternalRow, Any) => Unit = dt match { --- End diff -- We can rebase now --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22512: [SPARK-25498][SQL] InterpretedMutableProjection should h...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22512 thanks, merging to master! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22512: [SPARK-25498][SQL] InterpretedMutableProjection s...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22512#discussion_r238328405 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala --- @@ -148,12 +156,25 @@ class SQLQueryTestSuite extends QueryTest with SharedSQLContext { }) // When we are regenerating the golden files we don't need to run all the configs as they // all need to return the same result - if (regenerateGoldenFiles && configs.nonEmpty) { -configs.take(1) + if (regenerateGoldenFiles) { +if (configs.nonEmpty) { + configs.take(1) +} else { + Array.empty[Array[(String, String)]] --- End diff -- nit: since configs don't matter when generating result, I think we can just return empty configs here. We can clean it up in a followup PR. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23152: [SPARK-26181][SQL] the `hasMinMaxStats` method of `Colum...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23152 thanks, merging to master/2.4! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23204: Revert "[SPARK-21052][SQL] Add hash map metrics t...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23204#discussion_r238323331 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala --- @@ -483,8 +470,6 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap */ def getValue(key: Long, resultRow: UnsafeRow): UnsafeRow = { if (isDense) { - numKeyLookups += 1 - numProbes += 1 --- End diff -- +1, like I said in https://github.com/apache/spark/pull/23204/files#r238257371 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23204: Revert "[SPARK-21052][SQL] Add hash map metrics t...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23204#discussion_r238322699 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala --- @@ -483,8 +470,6 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap */ def getValue(key: Long, resultRow: UnsafeRow): UnsafeRow = { if (isDense) { - numKeyLookups += 1 - numProbes += 1 --- End diff -- If this is proved to cause perf regression, I think it's safer to revert `HashAggregateExec` as well, since they are doing the same thing. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22957: [SPARK-25951][SQL] Ignore aliases for distributio...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22957#discussion_r238321460 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala --- @@ -145,9 +145,14 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] { assert(requiredChildDistributions.length == children.length) assert(requiredChildOrderings.length == children.length) +val aliasMap = AttributeMap[Expression](children.flatMap(_.expressions.collect { + case a: Alias => (a.toAttribute, a) +})) + // Ensure that the operator's children satisfy their output distribution requirements. children = children.zip(requiredChildDistributions).map { - case (child, distribution) if child.outputPartitioning.satisfies(distribution) => + case (child, distribution) if child.outputPartitioning.satisfies( + distribution.mapExpressions(replaceAlias(_, aliasMap))) => --- End diff -- instead of doing it here, shall we deal with alias in `SparkPlan.outputPartitioning` for operators with a project list? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22957: [SPARK-25951][SQL] Ignore aliases for distributio...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22957#discussion_r238318256 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala --- @@ -145,9 +145,14 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] { assert(requiredChildDistributions.length == children.length) assert(requiredChildOrderings.length == children.length) +val aliasMap = AttributeMap[Expression](children.flatMap(_.expressions.collect { --- End diff -- is it safe to do so? I think we should only collect aliases from operators with a project list, i.e. `Project`, `Aggregate`, `Window`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23208: [SPARK-25530][SQL] data source v2 API refactor (batch wr...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23208 cc @rdblue @rxin @jose-torres @gatorsmile @HyukjinKwon @gengliangwang --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23208: [SPARK-25530][SQL] data source v2 API refactor (b...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23208#discussion_r238313454 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala --- @@ -17,52 +17,49 @@ package org.apache.spark.sql.execution.datasources.v2 -import java.util.UUID - -import scala.collection.JavaConverters._ +import java.util.{Optional, UUID} import org.apache.spark.sql.{AnalysisException, SaveMode} import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, NamedRelation} import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression} import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} import org.apache.spark.sql.catalyst.util.truncatedString -import org.apache.spark.sql.sources.DataSourceRegister import org.apache.spark.sql.sources.v2._ import org.apache.spark.sql.sources.v2.reader._ import org.apache.spark.sql.sources.v2.writer.BatchWriteSupport import org.apache.spark.sql.types.StructType /** - * A logical plan representing a data source v2 scan. + * A logical plan representing a data source v2 table. * - * @param source An instance of a [[DataSourceV2]] implementation. - * @param options The options for this scan. Used to create fresh [[BatchWriteSupport]]. - * @param userSpecifiedSchema The user-specified schema for this scan. + * @param table The table that this relation represents. + * @param options The options for this table operation. It's used to create fresh [[ScanBuilder]] + *and [[BatchWriteSupport]]. */ case class DataSourceV2Relation( -// TODO: remove `source` when we finish API refactor for write. -source: TableProvider, -table: SupportsBatchRead, +table: Table, output: Seq[AttributeReference], -options: Map[String, String], -userSpecifiedSchema: Option[StructType] = None) +// TODO: use a simple case insensitive map instead. --- End diff -- I'll do it in my next PR. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23208: [SPARK-25530][SQL] data source v2 API refactor (b...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23208#discussion_r238313221 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala --- @@ -241,32 +241,28 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { assertNotBucketed("save") -val cls = DataSource.lookupDataSource(source, df.sparkSession.sessionState.conf) -if (classOf[DataSourceV2].isAssignableFrom(cls)) { - val source = cls.getConstructor().newInstance().asInstanceOf[DataSourceV2] - source match { -case provider: BatchWriteSupportProvider => - val sessionOptions = DataSourceV2Utils.extractSessionConfigs( -source, -df.sparkSession.sessionState.conf) - val options = sessionOptions ++ extraOptions - +val session = df.sparkSession +val cls = DataSource.lookupDataSource(source, session.sessionState.conf) +if (classOf[TableProvider].isAssignableFrom(cls)) { + val provider = cls.getConstructor().newInstance().asInstanceOf[TableProvider] + val sessionOptions = DataSourceV2Utils.extractSessionConfigs( +provider, session.sessionState.conf) + val options = sessionOptions ++ extraOptions + val dsOptions = new DataSourceOptions(options.asJava) + provider.getTable(dsOptions) match { +case table: SupportsBatchWrite => + val relation = DataSourceV2Relation.create(table, dsOptions) + // TODO: revisit it. We should not create the `AppendData` operator for `SaveMode.Append`. + // We should create new end-users APIs for the `AppendData` operator. --- End diff -- according to the discussion in https://github.com/apache/spark/pull/22688#issuecomment-428626027 , the behavior of append operator and `SaveMode.Append` can be different. We should revisit it when we have the new end-user write APIs. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23208: [SPARK-25530][SQL] data source v2 API refactor (b...
GitHub user cloud-fan opened a pull request: https://github.com/apache/spark/pull/23208 [SPARK-25530][SQL] data source v2 API refactor (batch write) ## What changes were proposed in this pull request? Adjust the batch write API to match the read API refactor after https://github.com/apache/spark/pull/23086 Basically it renames `BatchWriteSupportProvider` to `SupportsBatchWrite`, and make it extend `Table`. It also cleans up some code as batch API is completed. This PR also removes the test from https://github.com/apache/spark/pull/22688 . Now data source must return a table for read/write. It's a little awkward to use it with the `SaveMode` based write APIs, as users can append data to a non-existing table. `TableProvider` needs to return a `Table` instance with empty schema if the table doesn't exist, so that we can write it later. Hopefully we can remove the `SaveMode` based write APIs after the new APIs are finished and widely used. A few notes about future changes: 1. We will create `SupportsStreamingWrite` later for streaming APIs 2. We will create `SupportsBatchReplaceWhere`, `SupportsBatchAppend`, etc. for the new end-user write APIs. I think streaming APIs would remain to use `OutputMode`, and new end-user write APIs will apply to batch only, at least in the near future. ## How was this patch tested? existing tests You can merge this pull request into a Git repository by running: $ git pull https://github.com/cloud-fan/spark refactor-batch Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/23208.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #23208 commit 00fc34fa793b922a48a4bf8e9f9cd0e3b688800b Author: Wenchen Fan Date: 2018-12-03T14:38:43Z data source v2 API refactor (batch write) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23204: Revert "[SPARK-21052][SQL] Add hash map metrics t...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23204#discussion_r238264122 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala --- @@ -57,12 +57,6 @@ class SQLMetric(val metricType: String, initValue: Long = 0L) extends Accumulato override def add(v: Long): Unit = _value += v - // We can set a double value to `SQLMetric` which stores only long value, if it is --- End diff -- I think we can keep the changes in this file as well. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23204: Revert "[SPARK-21052][SQL] Add hash map metrics to join"
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23204 I'm fine to revert it if it caused a significant performance regression, we should revisit it later, with different ideas, like updating the metrics for each batch instead of each record. cc @viirya @gatorsmile --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23204: Revert "[SPARK-21052][SQL] Add hash map metrics t...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23204#discussion_r23825 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala --- @@ -374,22 +374,6 @@ class TungstenAggregationIterator( } } - TaskContext.get().addTaskCompletionListener[Unit](_ => { --- End diff -- ditto, it's better to put this code block here, let's keep this change. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23204: Revert "[SPARK-21052][SQL] Add hash map metrics t...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23204#discussion_r238257371 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala --- @@ -63,7 +63,7 @@ case class HashAggregateExec( "peakMemory" -> SQLMetrics.createSizeMetric(sparkContext, "peak memory"), "spillSize" -> SQLMetrics.createSizeMetric(sparkContext, "spill size"), "aggTime" -> SQLMetrics.createTimingMetric(sparkContext, "aggregate time"), -"avgHashProbe" -> SQLMetrics.createAverageMetric(sparkContext, "avg hash probe")) +"avgHashmapProbe" -> SQLMetrics.createAverageMetric(sparkContext, "avg hashmap probe")) --- End diff -- I know it's easy to just run the `git revert` command, but I'd like to manually revert it, since that PR was merged long time ago. And we should still keep changes like this renaming, as they are not quite related to the performance regression. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23204: Revert "[SPARK-21052][SQL] Add hash map metrics to join"
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23204 ok to test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23186: [SPARK-26230][SQL]FileIndex: if case sensitive, validate...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23186 thanks, merging to master! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23203: [SPARK-26252][PYTHON] Add support to run specific unitte...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23203 I used to run pyspark test via `python python/pyspark/sql/dataframe.py`, after setting `export PYTHONPATH="$(find "${SPARK_HOME}"/python/lib/ -name 'py4j-*-src.zip' -type f | uniq)":"${SPARK_HOME}"/python`. I'm happy to see an easier way to do it, though I'm not very familiar with these scrpts. Thanks for doing it! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23120: [SPARK-26151][SQL] Return partial results for bad CSV re...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23120 thanks, merging to master! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23010: [SPARK-26012][SQL]Null and '' values should not cause dy...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23010 The root cause is, `DynamicPartitionDataWriter` treats null and empty string as different partition values, and creates new files. However, null and empty string are converted to `__HIVE_DEFAULT_PARTITION__` at the end. I think we should deal with invalid partition values ahead, so that we don't need to worry about them during writing. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23190: [SPARK-26117][FOLLOW-UP][SQL]throw SparkOutOfMemoryError...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23190 thanks, merging to master! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22512: [SPARK-25498][SQL] InterpretedMutableProjection s...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22512#discussion_r238172470 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala --- @@ -148,12 +156,21 @@ class SQLQueryTestSuite extends QueryTest with SharedSQLContext { }) // When we are regenerating the golden files we don't need to run all the configs as they // all need to return the same result - if (regenerateGoldenFiles && configs.nonEmpty) { + if (regenerateGoldenFiles) { configs.take(1) --- End diff -- what if `configs` is empty? `take(1)` will fail --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23186: [SPARK-26230][SQL]FileIndex: if case sensitive, validate...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23186 retest this pleae --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23120: [SPARK-26151][SQL] Return partial results for bad CSV re...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23120 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23120: [SPARK-26151][SQL] Return partial results for bad...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23120#discussion_r238083349 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala --- @@ -243,21 +243,27 @@ class UnivocityParser( () => getPartialResult(), new RuntimeException("Malformed CSV record")) } else { - try { -// When the length of the returned tokens is identical to the length of the parsed schema, -// we just need to convert the tokens that correspond to the required columns. -var i = 0 -while (i < requiredSchema.length) { + // When the length of the returned tokens is identical to the length of the parsed schema, + // we just need to convert the tokens that correspond to the required columns. + var badRecordException: Option[Throwable] = None + var i = 0 + while (i < requiredSchema.length) { +try { row(i) = valueConverters(i).apply(getToken(tokens, i)) - i += 1 +} catch { + case NonFatal(e) => +badRecordException = badRecordException.orElse(Some(e)) } +i += 1 + } + + if (badRecordException.isEmpty) { row - } catch { -case NonFatal(e) => - // For corrupted records with the number of tokens same as the schema, - // CSV reader doesn't support partial results. All fields other than the field - // configured by `columnNameOfCorruptRecord` are set to `null`. - throw BadRecordException(() => getCurrentInput, () => None, e) + } else { +// For corrupted records with the number of tokens same as the schema, +// CSV reader doesn't support partial results. All fields other than the field +// configured by `columnNameOfCorruptRecord` are set to `null`. --- End diff -- what do you mean here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23154: [SPARK-26195][SQL] Correct exception messages in some cl...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23154 thanks, merging to master! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23178: [SPARK-26216][SQL] Do not use case class as publi...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23178#discussion_r238083055 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/expressions/UserDefinedFunction.scala --- @@ -38,114 +38,106 @@ import org.apache.spark.sql.types.DataType * @since 1.3.0 */ @Stable --- End diff -- It's not a new API anyway, it will be weird to change since to 3.0. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23178: [SPARK-26216][SQL] Do not use case class as public API (...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23178 thanks for the review, merging to master! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23130: [SPARK-26161][SQL] Ignore empty files in load
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23130 We don't need to block it, but @MaxGekk if you have time, it would great to answer https://github.com/apache/spark/pull/23130#issuecomment-442491582 thanks, merging to master! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23190: [MINOR][SQL]throw SparkOutOfMemoryError intead of SparkE...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23190 LGTM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23190: [MINOR][SQL]throw SparkOutOfMemoryError intead of SparkE...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23190 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23187: [SPARK-26211][SQL][TEST][FOLLOW-UP] Combine test cases f...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23187 thanks, merging to master! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23153: [SPARK-26147][SQL] only pull out unevaluable pyth...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23153#discussion_r238082589 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala --- @@ -155,19 +155,20 @@ object EliminateOuterJoin extends Rule[LogicalPlan] with PredicateHelper { } /** - * PythonUDF in join condition can not be evaluated, this rule will detect the PythonUDF - * and pull them out from join condition. For python udf accessing attributes from only one side, - * they are pushed down by operation push down rules. If not (e.g. user disables filter push - * down rules), we need to pull them out in this rule too. + * PythonUDF in join condition can't be evaluated if it refers to attributes from both join sides. + * See `ExtractPythonUDFs` for details. This rule will detect un-evaluable PythonUDF and pull them + * out from join condition. */ object PullOutPythonUDFInJoinCondition extends Rule[LogicalPlan] with PredicateHelper { - def hasPythonUDF(expression: Expression): Boolean = { -expression.collectFirst { case udf: PythonUDF => udf }.isDefined + + private def hasUnevaluablePythonUDF(expr: Expression, j: Join): Boolean = { +expr.find { e => + PythonUDF.isScalarPythonUDF(e) && !canEvaluate(e, j.left) && !canEvaluate(e, j.right) --- End diff -- It's only possible to have scalar UDF in join condition, so changing it to `e.isInstanceOf[PythonUDF]` is same. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23176: [SPARK-26211][SQL] Fix InSet for binary, and stru...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23176#discussion_r237770687 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/PredicateSuite.scala --- @@ -293,6 +293,54 @@ class PredicateSuite extends SparkFunSuite with ExpressionEvalHelper { } } + test("INSET: binary") { --- End diff -- good idea! we should test `In` and `InSet` together --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22514: [SPARK-25271][SQL] Hive ctas commands should use ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22514#discussion_r237756348 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala --- @@ -95,9 +77,98 @@ case class CreateHiveTableAsSelectCommand( Seq.empty[Row] } + def getDataWritingCommand( --- End diff -- I feel it's better to have 2 methods: `writingCommandForExistingTable`, `writingCommandForNewTable` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22514: [SPARK-25271][SQL] Hive ctas commands should use ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22514#discussion_r237756394 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala --- @@ -95,9 +77,98 @@ case class CreateHiveTableAsSelectCommand( Seq.empty[Row] } + def getDataWritingCommand( +catalog: SessionCatalog, +tableDesc: CatalogTable, +tableExists: Boolean): DataWritingCommand + override def argString: String = { s"[Database:${tableDesc.database}, " + s"TableName: ${tableDesc.identifier.table}, " + s"InsertIntoHiveTable]" } } + +/** + * Create table and insert the query result into it. + * + * @param tableDesc the Table Describe, which may contain serde, storage handler etc. + * @param query the query whose result will be insert into the new relation + * @param mode SaveMode + */ +case class CreateHiveTableAsSelectCommand( +tableDesc: CatalogTable, +query: LogicalPlan, +outputColumnNames: Seq[String], +mode: SaveMode) + extends CreateHiveTableAsSelectBase { + + override def getDataWritingCommand( + catalog: SessionCatalog, + tableDesc: CatalogTable, + tableExists: Boolean): DataWritingCommand = { +if (tableExists) { + InsertIntoHiveTable( +tableDesc, +Map.empty, +query, +overwrite = false, +ifPartitionNotExists = false, +outputColumnNames = outputColumnNames) +} else { + // For CTAS, there is no static partition values to insert. + val partition = tableDesc.partitionColumnNames.map(_ -> None).toMap + InsertIntoHiveTable( +tableDesc, +partition, +query, +overwrite = true, +ifPartitionNotExists = false, +outputColumnNames = outputColumnNames) +} + } +} + +/** + * Create table and insert the query result into it. This creates Hive table but inserts + * the query result into it by using data source. + * + * @param tableDesc the Table Describe, which may contain serde, storage handler etc. --- End diff -- ditto --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22514: [SPARK-25271][SQL] Hive ctas commands should use ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22514#discussion_r237753623 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala --- @@ -95,9 +77,98 @@ case class CreateHiveTableAsSelectCommand( Seq.empty[Row] } + def getDataWritingCommand( +catalog: SessionCatalog, +tableDesc: CatalogTable, +tableExists: Boolean): DataWritingCommand + override def argString: String = { s"[Database:${tableDesc.database}, " + s"TableName: ${tableDesc.identifier.table}, " + s"InsertIntoHiveTable]" } } + +/** + * Create table and insert the query result into it. + * + * @param tableDesc the Table Describe, which may contain serde, storage handler etc. --- End diff -- `table description` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22514: [SPARK-25271][SQL] Hive ctas commands should use ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22514#discussion_r237753433 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala --- @@ -95,9 +77,98 @@ case class CreateHiveTableAsSelectCommand( Seq.empty[Row] } + def getDataWritingCommand( +catalog: SessionCatalog, +tableDesc: CatalogTable, +tableExists: Boolean): DataWritingCommand + override def argString: String = { s"[Database:${tableDesc.database}, " + s"TableName: ${tableDesc.identifier.table}, " + s"InsertIntoHiveTable]" } } + +/** + * Create table and insert the query result into it. + * + * @param tableDesc the Table Describe, which may contain serde, storage handler etc. + * @param query the query whose result will be insert into the new relation + * @param mode SaveMode + */ +case class CreateHiveTableAsSelectCommand( +tableDesc: CatalogTable, +query: LogicalPlan, +outputColumnNames: Seq[String], +mode: SaveMode) + extends CreateHiveTableAsSelectBase { + + override def getDataWritingCommand( + catalog: SessionCatalog, + tableDesc: CatalogTable, + tableExists: Boolean): DataWritingCommand = { +if (tableExists) { + InsertIntoHiveTable( +tableDesc, +Map.empty, +query, +overwrite = false, +ifPartitionNotExists = false, +outputColumnNames = outputColumnNames) +} else { + // For CTAS, there is no static partition values to insert. + val partition = tableDesc.partitionColumnNames.map(_ -> None).toMap + InsertIntoHiveTable( +tableDesc, +partition, +query, +overwrite = true, +ifPartitionNotExists = false, +outputColumnNames = outputColumnNames) +} + } +} + +/** + * Create table and insert the query result into it. This creates Hive table but inserts + * the query result into it by using data source. + * + * @param tableDesc the Table Describe, which may contain serde, storage handler etc. + * @param query the query whose result will be insert into the new relation + * @param mode SaveMode + */ +case class CreateHiveTableAsSelectWithDataSourceCommand( --- End diff -- `OptimizedCreateHiveTableAsSelectCommand`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22957: [SPARK-25951][SQL] Ignore aliases for distributions and ...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22957 LGTM, cc @viirya as well --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org