[GitHub] spark pull request #22957: [SPARK-25951][SQL] Ignore aliases for distributio...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22957#discussion_r237745005 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala --- @@ -780,6 +780,23 @@ class PlannerSuite extends SharedSQLContext { classOf[PartitioningCollection]) } } + + test("SPARK-25951: avoid redundant shuffle on rename") { --- End diff -- can we have an end-to-end test as well? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23165: [SPARK-26188][SQL] FileIndex: don't infer data types of ...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23165 thanks, merging to master/2.4! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23031: [SPARK-26060][SQL] Track SparkConf entries and make SET ...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23031 thanks, merging to master! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23086: [SPARK-25528][SQL] data source v2 API refactor (b...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23086#discussion_r237734357 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Scan.java --- @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader; + +import org.apache.spark.annotation.Evolving; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.sources.v2.SupportsBatchRead; +import org.apache.spark.sql.sources.v2.Table; + +/** + * A logical representation of a data source scan. This interface is used to provide logical + * information, like what the actual read schema is. + * + * This logical representation is shared between batch scan, micro-batch streaming scan and + * continuous streaming scan. Data sources must implement the corresponding methods in this + * interface, to match what the table promises to support. For example, {@link #toBatch()} must be + * implemented, if the {@link Table} that creates this {@link Scan} implements + * {@link SupportsBatchRead}. + * + */ +@Evolving +public interface Scan { + + /** + * Returns the actual schema of this data source scan, which may be different from the physical + * schema of the underlying storage, as column pruning or other optimizations may happen. + */ + StructType readSchema(); + + /** + * A description string of this scan, which may includes information like: what filters are + * configured for this scan, what's the value of some important options like path, etc. The + * description doesn't need to include {@link #readSchema()}, as Spark already knows it. + * + * By default this returns the class name of the implementation. Please override it to provide a + * meaningful description. + * + */ + default String description() { --- End diff -- Since this is an interface, and filter pushdown is optional, I'm not sure how to report `pushedFilters` here. The read schema is always reported, see `DataSourceV2ScanExec.simpleString`. Maybe we should still keep `pushedFilters` in `DataSourceV2ScanExec`, and display it in the plan string format. What do you think? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23152: [SPARK-26181][SQL] the `hasMinMaxStats` method of `Colum...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23152 good catch! Like @liancheng said, this is a real bug (a numeric column with all values null), it's better to have an end-to-end test case for it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23151: [SPARK-26180][CORE][TEST] Add a withCreateTempDir...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23151#discussion_r237732426 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala --- @@ -66,6 +66,18 @@ private[sql] trait SQLTestUtils extends SparkFunSuite with SQLTestUtilsBase with } } + /** + * Creates a temporary directory, which is then passed to `f` and will be deleted after `f` + * returns. + * + */ + protected override def withTempDir(f: File => Unit): Unit = { +super.withTempDir { dir => --- End diff -- yea this is what I expect, thanks for doing it! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23151: [SPARK-26180][CORE][TEST] Add a withCreateTempDir...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23151#discussion_r237732369 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala --- @@ -66,6 +66,18 @@ private[sql] trait SQLTestUtils extends SparkFunSuite with SQLTestUtilsBase with } } + /** + * Creates a temporary directory, which is then passed to `f` and will be deleted after `f` + * returns. + * --- End diff -- nit: unnecessary blank line --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23165: [SPARK-26188][SQL] FileIndex: don't infer data ty...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23165#discussion_r237532982 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala --- @@ -147,13 +163,13 @@ object PartitioningUtils { columnNames.zip(literals).map { case (name, Literal(_, dataType)) => // We always assume partition columns are nullable since we've no idea whether null values // will be appended in the future. - StructField(name, dataType, nullable = true) + StructField(name, userSpecifiedDataTypes.getOrElse(name, dataType), nullable = true) } } // Finally, we create `Partition`s based on paths and resolved partition values. val partitions = resolvedPartitionValues.zip(pathsWithPartitionValues).map { -case (PartitionValues(_, literals), (path, _)) => +case (PartitionValues(columnNames, literals), (path, _)) => --- End diff -- unnecessary change? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22957: [SPARK-25951][SQL] Ignore aliases for distributio...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22957#discussion_r237532156 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -2542,10 +2542,10 @@ object EliminateUnions extends Rule[LogicalPlan] { * rule can't work for those parameters. */ object CleanupAliases extends Rule[LogicalPlan] { - private def trimAliases(e: Expression): Expression = { + private[catalyst] def trimAliases(e: Expression): Expression = { e.transformDown { - case Alias(child, _) => child - case MultiAlias(child, _) => child + case Alias(child, _) => trimAliases(child) --- End diff -- it's `transformDown`, why doesn't it work? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23178: [SPARK-26216][SQL] Do not use case class as publi...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23178#discussion_r237522717 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/expressions/UserDefinedFunction.scala --- @@ -38,114 +38,108 @@ import org.apache.spark.sql.types.DataType * @since 1.3.0 */ @Stable -case class UserDefinedFunction protected[sql] ( -f: AnyRef, -dataType: DataType, -inputTypes: Option[Seq[DataType]]) { - - private var _nameOption: Option[String] = None - private var _nullable: Boolean = true - private var _deterministic: Boolean = true - - // This is a `var` instead of in the constructor for backward compatibility of this case class. - // TODO: revisit this case class in Spark 3.0, and narrow down the public surface. - private[sql] var nullableTypes: Option[Seq[Boolean]] = None +trait UserDefinedFunction { --- End diff -- good idea! though I'm not sure if `sealed` works for Java. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22957: [SPARK-25951][SQL] Ignore aliases for distributio...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22957#discussion_r237519971 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -2542,10 +2542,10 @@ object EliminateUnions extends Rule[LogicalPlan] { * rule can't work for those parameters. */ object CleanupAliases extends Rule[LogicalPlan] { - private def trimAliases(e: Expression): Expression = { + private[catalyst] def trimAliases(e: Expression): Expression = { e.transformDown { - case Alias(child, _) => child - case MultiAlias(child, _) => child + case Alias(child, _) => trimAliases(child) --- End diff -- what's going on here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23171: [SPARK-26205][SQL] Optimize In for bytes, shorts,...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23171#discussion_r237519018 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala --- @@ -335,6 +343,41 @@ case class In(value: Expression, list: Seq[Expression]) extends Predicate { """.stripMargin) } + private def genCodeWithSwitch(ctx: CodegenContext, ev: ExprCode): ExprCode = { +val (nullLiterals, nonNullLiterals) = list.partition { + case Literal(null, _) => true + case _ => false +} --- End diff -- maybe we can follow `InSet`, define a `hasNul` ahead, and filter out null values from the list before processing. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23176: [SPARK-26211][SQL] Fix InSet for binary, and struct and ...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23176 thanks, merging to master/2.4/2.3! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23165: [SPARK-26188][SQL] FileIndex: don't infer data ty...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23165#discussion_r237510705 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala --- @@ -94,18 +94,34 @@ object PartitioningUtils { paths: Seq[Path], typeInference: Boolean, basePaths: Set[Path], + userSpecifiedSchema: Option[StructType], + caseSensitive: Boolean, timeZoneId: String): PartitionSpec = { -parsePartitions(paths, typeInference, basePaths, DateTimeUtils.getTimeZone(timeZoneId)) +parsePartitions(paths, typeInference, basePaths, userSpecifiedSchema, + caseSensitive, DateTimeUtils.getTimeZone(timeZoneId)) } private[datasources] def parsePartitions( paths: Seq[Path], typeInference: Boolean, basePaths: Set[Path], + userSpecifiedSchema: Option[StructType], + caseSensitive: Boolean, timeZone: TimeZone): PartitionSpec = { +val userSpecifiedDataTypes = if (userSpecifiedSchema.isDefined) { --- End diff -- can we build this at the caller side out of `PartitioningUtils`? Then we only need one extra parameter. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23165: [SPARK-26188][SQL] FileIndex: don't infer data ty...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23165#discussion_r237510162 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala --- @@ -250,7 +276,13 @@ object PartitioningUtils { val rawColumnValue = columnSpec.drop(equalSignIndex + 1) assert(rawColumnValue.nonEmpty, s"Empty partition column value in '$columnSpec'") - val literal = inferPartitionColumnValue(rawColumnValue, typeInference, timeZone) + val literal = if (userSpecifiedDataTypes.contains(columnName)) { +// SPARK-26188: if user provides corresponding column schema, process the column as String +// type and cast it as user specified data type later. --- End diff -- can we do the cast here? It's a good practise to put related code together --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23165: [SPARK-26188][SQL] FileIndex: don't infer data ty...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23165#discussion_r237508120 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala --- @@ -126,33 +126,15 @@ abstract class PartitioningAwareFileIndex( val caseInsensitiveOptions = CaseInsensitiveMap(parameters) val timeZoneId = caseInsensitiveOptions.get(DateTimeUtils.TIMEZONE_OPTION) .getOrElse(sparkSession.sessionState.conf.sessionLocalTimeZone) -val inferredPartitionSpec = PartitioningUtils.parsePartitions( + +val caseSensitive = sparkSession.sqlContext.conf.caseSensitiveAnalysis +PartitioningUtils.parsePartitions( leafDirs, typeInference = sparkSession.sessionState.conf.partitionColumnTypeInferenceEnabled, basePaths = basePaths, + userSpecifiedSchema = userSpecifiedSchema, + caseSensitive = caseSensitive, timeZoneId = timeZoneId) -userSpecifiedSchema match { - case Some(userProvidedSchema) if userProvidedSchema.nonEmpty => -val userPartitionSchema = - combineInferredAndUserSpecifiedPartitionSchema(inferredPartitionSpec) --- End diff -- we can remove `combineInferredAndUserSpecifiedPartitionSchema` now --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23132: [SPARK-26163][SQL] Parsing decimals from JSON using loca...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23132 thanks, merging to master! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23178: [SPARK-26216][SQL] Do not use case class as publi...
GitHub user cloud-fan opened a pull request: https://github.com/apache/spark/pull/23178 [SPARK-26216][SQL] Do not use case class as public API (UserDefinedFunction) ## What changes were proposed in this pull request? It's a bad idea to use case class as public API, as it has a very wide surface. For example, the `copy` method, its fields, the companion object, etc. For a particular case, `UserDefinedFunction`. It has a private constructor, and I believe we only want users to access a few methods:`apply`, `nullable`, `asNonNullable`, etc. However, all its fields, and `copy` method, and the companion object are public unexpectedly. As a result, we made many tricks to work around the binary compatibility issues. This PR proposes to only make interfaces public, and hide implementations behind with a private class. Now `UserDefinedFunction` is a pure trait, and the concrete implementation is `SparkUserDefinedFunction`, which is private. This is the first PR to go with this direction. If it's accepted, I'll create a umbrella JIRA and fix all the public case classes. ## How was this patch tested? existing tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/cloud-fan/spark udf Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/23178.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #23178 commit 700334f3b14cfe88d6141c8a99ec339ec7a16afc Author: Wenchen Fan Date: 2018-11-29T13:38:51Z Do not use case class as public API (UserDefinedFunction) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23178: [SPARK-26216][SQL] Do not use case class as public API (...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23178 cc @rxin @srowen @gatorsmile @HyukjinKwon @dongjoon-hyun --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23176: [SPARK-26211][SQL] Fix InSet for binary, and struct and ...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23176 good catch! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23176: [SPARK-26211][SQL] Fix InSet for binary, and stru...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23176#discussion_r237383211 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/PredicateSuite.scala --- @@ -293,6 +293,54 @@ class PredicateSuite extends SparkFunSuite with ExpressionEvalHelper { } } + test("INSET: binary") { +val hS = HashSet[Any]() + Array(1.toByte, 2.toByte) + Array(3.toByte) +val nS = HashSet[Any]() + Array(1.toByte, 2.toByte) + Array(3.toByte) + null +val onetwo = Literal(Array(1.toByte, 2.toByte)) +val three = Literal(Array(3.toByte)) +val threefour = Literal(Array(3.toByte, 4.toByte)) +val nl = Literal(null, onetwo.dataType) +checkEvaluation(InSet(onetwo, hS), true) +checkEvaluation(InSet(three, hS), true) +checkEvaluation(InSet(three, nS), true) --- End diff -- this line is duplicated --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23176: [SPARK-26211][SQL] Fix InSet for binary, and stru...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23176#discussion_r237382990 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala --- @@ -367,11 +367,29 @@ case class InSet(child: Expression, hset: Set[Any]) extends UnaryExpression with } @transient lazy val set: Set[Any] = child.dataType match { -case _: AtomicType => hset +case t: AtomicType if !t.isInstanceOf[BinaryType] => hset case _: NullType => hset case _ => + val ord = TypeUtils.getInterpretedOrdering(child.dataType) + val ordering = if (hasNull) { +new Ordering[Any] { + override def compare(x: Any, y: Any): Int = { +if (x == null && y == null) { + 0 +} else if (x == null) { + -1 +} else if (y == null) { + 1 +} else { + ord.compare(x, y) +} + } +} + } else { +ord + } // for structs use interpreted ordering to be able to compare UnsafeRows with non-UnsafeRows - TreeSet.empty(TypeUtils.getInterpretedOrdering(child.dataType)) ++ hset + TreeSet.empty(ordering) ++ hset --- End diff -- and udpate eval to ``` if (value == null) { null } else if (set.contains(value)) { true } else if (hasNull) { null } else { false } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23176: [SPARK-26211][SQL] Fix InSet for binary, and stru...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23176#discussion_r237382322 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala --- @@ -367,11 +367,29 @@ case class InSet(child: Expression, hset: Set[Any]) extends UnaryExpression with } @transient lazy val set: Set[Any] = child.dataType match { -case _: AtomicType => hset +case t: AtomicType if !t.isInstanceOf[BinaryType] => hset case _: NullType => hset case _ => + val ord = TypeUtils.getInterpretedOrdering(child.dataType) + val ordering = if (hasNull) { +new Ordering[Any] { + override def compare(x: Any, y: Any): Int = { +if (x == null && y == null) { + 0 +} else if (x == null) { + -1 +} else if (y == null) { + 1 +} else { + ord.compare(x, y) +} + } +} + } else { +ord + } // for structs use interpreted ordering to be able to compare UnsafeRows with non-UnsafeRows - TreeSet.empty(TypeUtils.getInterpretedOrdering(child.dataType)) ++ hset + TreeSet.empty(ordering) ++ hset --- End diff -- shall we just filter out nulls when building the tree set? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23031: [SPARK-26060][SQL] Track SparkConf entries and ma...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23031#discussion_r237364466 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -1610,6 +1610,14 @@ object SQLConf { """ "... N more fields" placeholder.""") .intConf .createWithDefault(25) + + val SET_COMMAND_REJECTS_SPARK_CONFS = +buildConf("spark.sql.execution.setCommandRejectsSparkConfs") --- End diff -- shall we use the legacy prefix? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23151: [SPARK-26180][CORE][TEST] Add a withCreateTempDir...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23151#discussion_r237364160 --- Diff: core/src/test/scala/org/apache/spark/SparkFunSuite.scala --- @@ -105,5 +105,16 @@ abstract class SparkFunSuite logInfo(s"\n\n= FINISHED $shortSuiteName: '$testName' =\n") } } - + /** + * Creates a temporary directory, which is then passed to `f` and will be deleted after `f` + * returns. + * + * @todo Probably this method should be moved to a more general place + */ + protected def withCreateTempDir(f: File => Unit): Unit = { --- End diff -- I'm not talking about details like which calss to override, just the idea. Why wouldn't override work? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22514: [SPARK-25271][SQL] Hive ctas commands should use data so...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22514 can we try a query and see what the SQL UI looks like? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22514: [SPARK-25271][SQL] Hive ctas commands should use ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22514#discussion_r237363826 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala --- @@ -92,4 +92,18 @@ class HiveParquetSuite extends QueryTest with ParquetTest with TestHiveSingleton } } } + + test("SPARK-25271: write empty map into hive parquet table") { --- End diff -- +1 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22514: [SPARK-25271][SQL] Hive ctas commands should use ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22514#discussion_r237361155 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala --- @@ -181,62 +180,39 @@ case class RelationConversions( conf: SQLConf, sessionCatalog: HiveSessionCatalog) extends Rule[LogicalPlan] { private def isConvertible(relation: HiveTableRelation): Boolean = { -val serde = relation.tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT) -serde.contains("parquet") && conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET) || - serde.contains("orc") && conf.getConf(HiveUtils.CONVERT_METASTORE_ORC) +isConvertible(relation.tableMeta) } - // Return true for Apache ORC and Hive ORC-related configuration names. - // Note that Spark doesn't support configurations like `hive.merge.orcfile.stripe.level`. - private def isOrcProperty(key: String) = -key.startsWith("orc.") || key.contains(".orc.") - - private def isParquetProperty(key: String) = -key.startsWith("parquet.") || key.contains(".parquet.") - - private def convert(relation: HiveTableRelation): LogicalRelation = { -val serde = relation.tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT) - -// Consider table and storage properties. For properties existing in both sides, storage -// properties will supersede table properties. -if (serde.contains("parquet")) { - val options = relation.tableMeta.properties.filterKeys(isParquetProperty) ++ -relation.tableMeta.storage.properties + (ParquetOptions.MERGE_SCHEMA -> - conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING).toString) - sessionCatalog.metastoreCatalog -.convertToLogicalRelation(relation, options, classOf[ParquetFileFormat], "parquet") -} else { - val options = relation.tableMeta.properties.filterKeys(isOrcProperty) ++ -relation.tableMeta.storage.properties - if (conf.getConf(SQLConf.ORC_IMPLEMENTATION) == "native") { -sessionCatalog.metastoreCatalog.convertToLogicalRelation( - relation, - options, - classOf[org.apache.spark.sql.execution.datasources.orc.OrcFileFormat], - "orc") - } else { -sessionCatalog.metastoreCatalog.convertToLogicalRelation( - relation, - options, - classOf[org.apache.spark.sql.hive.orc.OrcFileFormat], - "orc") - } -} + private def isConvertible(tableMeta: CatalogTable): Boolean = { +val serde = tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT) +serde.contains("parquet") && SQLConf.get.getConf(HiveUtils.CONVERT_METASTORE_PARQUET) || + serde.contains("orc") && SQLConf.get.getConf(HiveUtils.CONVERT_METASTORE_ORC) } + private val metastoreCatalog = sessionCatalog.metastoreCatalog + override def apply(plan: LogicalPlan): LogicalPlan = { plan resolveOperators { // Write path case InsertIntoTable(r: HiveTableRelation, partition, query, overwrite, ifPartitionNotExists) // Inserting into partitioned table is not supported in Parquet/Orc data source (yet). if query.resolved && DDLUtils.isHiveTable(r.tableMeta) && !r.isPartitioned && isConvertible(r) => -InsertIntoTable(convert(r), partition, query, overwrite, ifPartitionNotExists) +InsertIntoTable(metastoreCatalog.convert(r), partition, + query, overwrite, ifPartitionNotExists) // Read path case relation: HiveTableRelation if DDLUtils.isHiveTable(relation.tableMeta) && isConvertible(relation) => -convert(relation) +metastoreCatalog.convert(relation) + + // CTAS + case CreateTable(tableDesc, mode, Some(query)) + if DDLUtils.isHiveTable(tableDesc) && tableDesc.partitionColumnNames.isEmpty && +isConvertible(tableDesc) => +DDLUtils.checkDataColNames(tableDesc) --- End diff -- why do we need this? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23151: [SPARK-26180][CORE][TEST] Add a withCreateTempDir...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23151#discussion_r237347450 --- Diff: core/src/test/scala/org/apache/spark/SparkFunSuite.scala --- @@ -105,5 +105,16 @@ abstract class SparkFunSuite logInfo(s"\n\n= FINISHED $shortSuiteName: '$testName' =\n") } } - + /** + * Creates a temporary directory, which is then passed to `f` and will be deleted after `f` + * returns. + * + * @todo Probably this method should be moved to a more general place + */ + protected def withCreateTempDir(f: File => Unit): Unit = { --- End diff -- OK, how about in `SQLTestUtils` we override `withTempDir` with this extra logic? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23086: [SPARK-25528][SQL] data source v2 API refactor (b...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23086#discussion_r237346499 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala --- @@ -22,86 +22,56 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.physical import org.apache.spark.sql.catalyst.plans.physical.SinglePartition +import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.execution.{ColumnarBatchScan, LeafExecNode, WholeStageCodegenExec} -import org.apache.spark.sql.execution.streaming.continuous._ -import org.apache.spark.sql.sources.v2.DataSourceV2 import org.apache.spark.sql.sources.v2.reader._ -import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousPartitionReaderFactory, ContinuousReadSupport, MicroBatchReadSupport} /** - * Physical plan node for scanning data from a data source. + * Physical plan node for scanning a batch of data from a data source. */ case class DataSourceV2ScanExec( output: Seq[AttributeReference], -@transient source: DataSourceV2, -@transient options: Map[String, String], -@transient pushedFilters: Seq[Expression], -@transient readSupport: ReadSupport, -@transient scanConfig: ScanConfig) - extends LeafExecNode with DataSourceV2StringFormat with ColumnarBatchScan { +scanDesc: String, +@transient batch: Batch) --- End diff -- @rdblue I want to reuse this plan for batch and microbatch. Here this plan doesn't take `Scan` but just `Batch`, so that the caller side is flexible to decide how to produce batch(es) from a scan. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23086: [SPARK-25528][SQL] data source v2 API refactor (b...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23086#discussion_r237346029 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/Table.java --- @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2; --- End diff -- for other reviewers: in the ds v2 community sync, we decided to move data source v2 into a new module `sql-api`, and make catalyst depends on it. This will be done in a followup. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23086: [SPARK-25528][SQL] data source v2 API refactor (b...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23086#discussion_r237346128 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/TableProvider.java --- @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2; + +import org.apache.spark.annotation.Evolving; +import org.apache.spark.sql.sources.DataSourceRegister; +import org.apache.spark.sql.types.StructType; + +/** + * The base interface for v2 data sources which don't have a real catalog. Implementations must + * have a public, 0-arg constructor. + * + * The major responsibility of this interface is to return a {@link Table} for read/write. + */ +@Evolving +// TODO: do not extend `DataSourceV2`, after we finish the API refactor completely. +public interface TableProvider extends DataSourceV2 { + + /** + * Return a {@link Table} instance to do read/write with user-specified options. + * + * @param options the user-specified options that can identify a table, e.g. file path, Kafka + *topic name, etc. It's an immutable case-insensitive string-to-string map. + */ + Table getTable(DataSourceOptions options); + + /** + * Return a {@link Table} instance to do read/write with user-specified schema and options. + * + * By default this method throws {@link UnsupportedOperationException}, implementations should --- End diff -- added the throw clause. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23086: [SPARK-25528][SQL] data source v2 API refactor (b...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23086#discussion_r237342899 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/Table.java --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2; + +import org.apache.spark.annotation.Evolving; +import org.apache.spark.sql.sources.v2.reader.Scan; +import org.apache.spark.sql.sources.v2.reader.ScanBuilder; +import org.apache.spark.sql.types.StructType; + +/** + * An interface representing a logical structured data set of a data source. For example, the + * implementation can be a directory on the file system, a topic of Kafka, or a table in the + * catalog, etc. + * + * This interface can mixin the following interfaces to support different operations: + * + * + * {@link SupportsBatchRead}: this table can be read in batch queries. + * + */ +@Evolving +public interface Table { + + /** + * A name to identify this table. + * + * By default this returns the class name of this implementation. Please override it to provide a + * meaningful name, like the database and table name from catalog, or the location of files for + * this table. + * + */ + default String name() { --- End diff -- Do you think it's better to just ask implementations to override `toString`? cc @rdblue --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22957: [SPARK-25951][SQL] Ignore aliases for distributions and ...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22957 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23165: [SPARK-26188][SQL] FileIndex: don't infer data ty...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23165#discussion_r237339152 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala --- @@ -49,6 +50,21 @@ class FileIndexSuite extends SharedSQLContext { } } + test("SPARK-26188: don't infer data types of partition columns if user specifies schema") { +withTempDir { dir => + val partitionDirectory = new File(dir, s"a=4d") + partitionDirectory.mkdir() + val file = new File(partitionDirectory, "text.txt") + stringToFile(file, "text") + val path = new Path(dir.getCanonicalPath) + val schema = StructType(Seq(StructField("a", StringType, false))) + val catalog = new InMemoryFileIndex(spark, Seq(path), Map.empty, Some(schema)) --- End diff -- `catalog` ->`fileIndex` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23165: [SPARK-26188][SQL] FileIndex: don't infer data ty...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23165#discussion_r237339038 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala --- @@ -126,13 +126,14 @@ abstract class PartitioningAwareFileIndex( val caseInsensitiveOptions = CaseInsensitiveMap(parameters) val timeZoneId = caseInsensitiveOptions.get(DateTimeUtils.TIMEZONE_OPTION) .getOrElse(sparkSession.sessionState.conf.sessionLocalTimeZone) -val inferredPartitionSpec = PartitioningUtils.parsePartitions( - leafDirs, - typeInference = sparkSession.sessionState.conf.partitionColumnTypeInferenceEnabled, - basePaths = basePaths, - timeZoneId = timeZoneId) + userSpecifiedSchema match { case Some(userProvidedSchema) if userProvidedSchema.nonEmpty => +val inferredPartitionSpec = PartitioningUtils.parsePartitions( + leafDirs, + typeInference = false, --- End diff -- can you add some comment, so that we don't make the same mistake in the future? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23151: [SPARK-26180][CORE][TEST] Add a withCreateTempDir...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23151#discussion_r237338904 --- Diff: core/src/test/scala/org/apache/spark/SparkFunSuite.scala --- @@ -105,5 +105,16 @@ abstract class SparkFunSuite logInfo(s"\n\n= FINISHED $shortSuiteName: '$testName' =\n") } } - + /** + * Creates a temporary directory, which is then passed to `f` and will be deleted after `f` + * returns. + * + * @todo Probably this method should be moved to a more general place + */ + protected def withCreateTempDir(f: File => Unit): Unit = { --- End diff -- shall we also do `waitForTasksToFinish` in `withCreateTempDir`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23171: [SPARK-26205][SQL] Optimize In for bytes, shorts, ints
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23171 I'm wondering if this is still useful after we fix the boxing issue in `InSet`. We can write a binary hash set for primitive types, like `LongToUnsafeRowMap`, which should have better performance. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23132: [SPARK-26163][SQL] Parsing decimals from JSON using loca...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23132 LGTM, does CSV need to do the same? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23124: [SPARK-25829][SQL] remove duplicated map keys with last ...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23124 thanks, merging to master! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23130: [SPARK-26161][SQL] Ignore empty files in load
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23130 I think this change makes sense, at least it's good for performance. My only concern is, shall we ask all the parsers to return Nil for empty files? AFAIK JSON doesn't follow it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23130: [SPARK-26161][SQL] Ignore empty files in load
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23130#discussion_r237121289 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala --- @@ -142,4 +144,15 @@ class SaveLoadSuite extends DataSourceTest with SharedSQLContext with BeforeAndA assert(e.contains(s"Partition column `$unknown` not found in schema $schemaCatalog")) } } + + test("skip empty files in non bucketed read") { +withTempDir { dir => + val path = dir.getCanonicalPath + Files.write(Paths.get(path, "empty"), Array.empty[Byte]) + Files.write(Paths.get(path, "notEmpty"), "a".getBytes(StandardCharsets.UTF_8)) + val readback = spark.read.option("wholetext", true).text(path) + + assert(readback.rdd.getNumPartitions === 1) --- End diff -- thanks for pointing it out, I think we are good here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23132: [SPARK-26163][SQL] Parsing decimals from JSON usi...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23132#discussion_r237120812 --- Diff: docs/sql-migration-guide-upgrade.md --- @@ -9,6 +9,8 @@ displayTitle: Spark SQL Upgrading Guide ## Upgrading From Spark SQL 2.4 to 3.0 + - In Spark version 2.4 and earlier, accepted format of decimals parsed from JSON is an optional sign ('+' or '-'), followed by a sequence of zero or more decimal digits, optionally followed by a fraction, optionally followed by an exponent. Any commas were removed from the input before parsing. Since Spark 3.0, format varies and depends on locale which can be set via JSON option `locale`. The default locale is `en-US`. To switch back to previous behavior, set `spark.sql.legacy.decimalParsing.enabled` to `true`. --- End diff -- since the default value is `en-US`, can we skip `DecimalFormat` when locale is `en-US`? Then there is nothing changes by default, and we don't even need a config. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23120: [SPARK-26151][SQL] Return partial results for bad...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23120#discussion_r237118916 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala --- @@ -243,21 +243,27 @@ class UnivocityParser( () => getPartialResult(), new RuntimeException("Malformed CSV record")) } else { - try { -// When the length of the returned tokens is identical to the length of the parsed schema, -// we just need to convert the tokens that correspond to the required columns. -var i = 0 -while (i < requiredSchema.length) { + // When the length of the returned tokens is identical to the length of the parsed schema, + // we just need to convert the tokens that correspond to the required columns. + var badRecordException: Option[Throwable] = None + var i = 0 + while (i < requiredSchema.length) { --- End diff -- I know it's doable for CSV, as the tokens are separated ahead, and we can keep parsing after an exception. Is it also doable for other text based data sources? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23086: [SPARK-25528][SQL] data source v2 API refactor (b...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23086#discussion_r237113069 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala --- @@ -23,85 +23,55 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.physical import org.apache.spark.sql.catalyst.plans.physical.SinglePartition import org.apache.spark.sql.execution.{ColumnarBatchScan, LeafExecNode, WholeStageCodegenExec} -import org.apache.spark.sql.execution.streaming.continuous._ import org.apache.spark.sql.sources.v2.DataSourceV2 import org.apache.spark.sql.sources.v2.reader._ -import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousPartitionReaderFactory, ContinuousReadSupport, MicroBatchReadSupport} /** - * Physical plan node for scanning data from a data source. + * Physical plan node for scanning a batch of data from a data source. */ case class DataSourceV2ScanExec( output: Seq[AttributeReference], @transient source: DataSourceV2, @transient options: Map[String, String], @transient pushedFilters: Seq[Expression], -@transient readSupport: ReadSupport, -@transient scanConfig: ScanConfig) +@transient batch: Batch) extends LeafExecNode with DataSourceV2StringFormat with ColumnarBatchScan { override def simpleString: String = "ScanV2 " + metadataString // TODO: unify the equal/hashCode implementation for all data source v2 query plans. override def equals(other: Any): Boolean = other match { -case other: DataSourceV2ScanExec => - output == other.output && readSupport.getClass == other.readSupport.getClass && -options == other.options +case other: DataSourceV2ScanExec => this.batch == other.batch case _ => false } - override def hashCode(): Int = { -Seq(output, source, options).hashCode() - } + override def hashCode(): Int = batch.hashCode() + + private lazy val partitions = batch.planInputPartitions() + + private lazy val readerFactory = batch.createReaderFactory() - override def outputPartitioning: physical.Partitioning = readSupport match { + override def outputPartitioning: physical.Partitioning = batch match { --- End diff -- add back https://github.com/apache/spark/pull/23086#discussion_r236858449 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23086: [SPARK-25528][SQL] data source v2 API refactor (b...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23086#discussion_r237111240 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala --- @@ -170,15 +157,24 @@ object DataSourceV2Relation { } def create( - source: DataSourceV2, + provider: TableProvider, + table: SupportsBatchRead, options: Map[String, String], tableIdent: Option[TableIdentifier] = None, userSpecifiedSchema: Option[StructType] = None): DataSourceV2Relation = { -val readSupport = source.createReadSupport(options, userSpecifiedSchema) -val output = readSupport.fullSchema().toAttributes +val output = table.schema().toAttributes val ident = tableIdent.orElse(tableFromOptions(options)) DataSourceV2Relation( - source, readSupport, output, options, ident, userSpecifiedSchema) + provider, table, output, options, ident, userSpecifiedSchema) + } + + def createRelationForWrite( --- End diff -- done --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23086: [SPARK-25528][SQL] data source v2 API refactor (b...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23086#discussion_r237111058 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala --- @@ -40,8 +40,8 @@ import org.apache.spark.sql.types.StructType * @param userSpecifiedSchema The user-specified schema for this scan. */ case class DataSourceV2Relation( -source: DataSourceV2, -readSupport: BatchReadSupport, +source: TableProvider, --- End diff -- done --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23086: [SPARK-25528][SQL] data source v2 API refactor (b...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23086#discussion_r237110747 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala --- @@ -38,7 +38,7 @@ import org.apache.spark.sql.execution.datasources.jdbc._ import org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils -import org.apache.spark.sql.sources.v2.{BatchReadSupportProvider, DataSourceOptions, DataSourceV2} +import org.apache.spark.sql.sources.v2._ --- End diff -- It's the IDE that turns it into wildcard, because it gets too long. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23086: [SPARK-25528][SQL] data source v2 API refactor (b...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23086#discussion_r237103452 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala --- @@ -54,27 +53,17 @@ case class DataSourceV2ScanExec( Seq(output, source, options).hashCode() } - override def outputPartitioning: physical.Partitioning = readSupport match { + override def outputPartitioning: physical.Partitioning = scan match { --- End diff -- filter pushdown happens at the planning phase, so the physical plan is the only place users can know which filters are pushed. Shall we keep `pushedFilters` in the scan node? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23086: [SPARK-25528][SQL] data source v2 API refactor (b...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23086#discussion_r237092275 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala --- @@ -23,29 +23,28 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.physical import org.apache.spark.sql.catalyst.plans.physical.SinglePartition import org.apache.spark.sql.execution.{ColumnarBatchScan, LeafExecNode, WholeStageCodegenExec} -import org.apache.spark.sql.execution.streaming.continuous._ import org.apache.spark.sql.sources.v2.DataSourceV2 import org.apache.spark.sql.sources.v2.reader._ -import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousPartitionReaderFactory, ContinuousReadSupport, MicroBatchReadSupport} /** - * Physical plan node for scanning data from a data source. + * Physical plan node for scanning a batch of data from a data source. */ case class DataSourceV2ScanExec( output: Seq[AttributeReference], @transient source: DataSourceV2, @transient options: Map[String, String], --- End diff -- `source` and `options` are also used to define the string format of this plan, as it extends `DataSourceV2StringFormat`. Maybe we don't need a pretty string format for physical scan node? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23086: [SPARK-25528][SQL] data source v2 API refactor (b...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23086#discussion_r237080333 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/TableProvider.java --- @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2; + +import org.apache.spark.annotation.Evolving; +import org.apache.spark.sql.sources.DataSourceRegister; +import org.apache.spark.sql.types.StructType; + +/** + * The base interface for v2 data sources which don't have a real catalog. Implementations must + * have a public, 0-arg constructor. + * + * The major responsibility of this interface is to return a {@link Table} for read/write. + */ +@Evolving +// TODO: do not extend `DataSourceV2`, after we finish the API refactor completely. +public interface TableProvider extends DataSourceV2 { + + /** + * Return a {@link Table} instance to do read/write with user-specified options. + * + * @param options the user-specified options that can identify a table, e.g. file path, Kafka + *topic name, etc. It's an immutable case-insensitive string-to-string map. + */ + Table getTable(DataSourceOptions options); + + /** + * Return a {@link Table} instance to do read/write with user-specified schema and options. + * --- End diff -- thanks for the hint about new paragraph! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23086: [SPARK-25528][SQL] data source v2 API refactor (b...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23086#discussion_r237078844 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/TableProvider.java --- @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2; + +import org.apache.spark.annotation.Evolving; +import org.apache.spark.sql.sources.DataSourceRegister; +import org.apache.spark.sql.types.StructType; + +/** + * The base interface for v2 data sources which don't have a real catalog. Implementations must + * have a public, 0-arg constructor. + * + * The major responsibility of this interface is to return a {@link Table} for read/write. + */ +@Evolving +// TODO: do not extend `DataSourceV2`, after we finish the API refactor completely. +public interface TableProvider extends DataSourceV2 { + + /** + * Return a {@link Table} instance to do read/write with user-specified options. + * + * @param options the user-specified options that can identify a table, e.g. file path, Kafka + *topic name, etc. It's an immutable case-insensitive string-to-string map. + */ + Table getTable(DataSourceOptions options); + + /** + * Return a {@link Table} instance to do read/write with user-specified schema and options. + * + * By default this method throws {@link UnsupportedOperationException}, implementations should --- End diff -- what I learned is that, we should only declare checked exceptions. See http://www.javapractices.com/topic/TopicAction.do?Id=171 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22957: [SPARK-25951][SQL] Ignore aliases for distributio...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22957#discussion_r237076213 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala --- @@ -195,14 +195,26 @@ abstract class Expression extends TreeNode[Expression] { } /** - * Returns true when two expressions will always compute the same result, even if they differ + * Returns true when two expressions will always compute the same output, even if they differ * cosmetically (i.e. capitalization of names in attributes may be different). * * See [[Canonicalize]] for more details. */ def semanticEquals(other: Expression): Boolean = deterministic && other.deterministic && canonicalized == other.canonicalized + /** + * Returns true when two expressions will always compute the same result, even if the output may + * be different, because of different names or similar differences. + * Usually this means they their canonicalized form equals, but it may also not be the case, as + * different output expressions can evaluate to the same result as well (eg. when an expression + * is aliased). + */ + def sameResult(other: Expression): Boolean = other match { +case a: Alias => sameResult(a.child) +case _ => this.semanticEquals(other) --- End diff -- we can do ``` CleanupAliases.trimAliases(this) semanticEquals CleanupAliases.trimAliases(other) ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23031: [SPARK-26060][SQL] Track SparkConf entries and ma...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23031#discussion_r237075228 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala --- @@ -2715,4 +2715,11 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils { } } } + + test("set command rejects SparkConf entries") { +val ex = intercept[AnalysisException] { + sql("SET spark.task.cpus = 4") --- End diff -- ditto --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23031: [SPARK-26060][SQL] Track SparkConf entries and ma...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23031#discussion_r237075170 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/RuntimeConfigSuite.scala --- @@ -68,4 +68,13 @@ class RuntimeConfigSuite extends SparkFunSuite { assert(!conf.isModifiable("")) assert(!conf.isModifiable("invalid config parameter")) } + + test("reject SparkConf entries") { +val conf = newConf() + +val ex = intercept[AnalysisException] { + conf.set("spark.task.cpus", 4) --- End diff -- can we use `config.CPUS_PER_TASK` instead of hardcoding it? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23031: [SPARK-26060][SQL] Track SparkConf entries and ma...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23031#discussion_r237074727 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala --- @@ -154,5 +154,9 @@ class RuntimeConfig private[sql](sqlConf: SQLConf = new SQLConf) { if (SQLConf.staticConfKeys.contains(key)) { throw new AnalysisException(s"Cannot modify the value of a static config: $key") } +if (sqlConf.setCommandRejectsSparkConfs && +ConfigEntry.findEntry(key) != null && !SQLConf.sqlConfEntries.containsKey(key)) { --- End diff -- we should only reject configs that are registered as SparkConf. Thinking about configs that are either a SparkConf or SQLConf, we shouldn't reject it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22957: [SPARK-25951][SQL] Ignore aliases for distributio...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22957#discussion_r237070739 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala --- @@ -195,14 +195,26 @@ abstract class Expression extends TreeNode[Expression] { } /** - * Returns true when two expressions will always compute the same result, even if they differ + * Returns true when two expressions will always compute the same output, even if they differ * cosmetically (i.e. capitalization of names in attributes may be different). * * See [[Canonicalize]] for more details. */ def semanticEquals(other: Expression): Boolean = deterministic && other.deterministic && canonicalized == other.canonicalized + /** + * Returns true when two expressions will always compute the same result, even if the output may + * be different, because of different names or similar differences. + * Usually this means they their canonicalized form equals, but it may also not be the case, as + * different output expressions can evaluate to the same result as well (eg. when an expression + * is aliased). + */ + def sameResult(other: Expression): Boolean = other match { +case a: Alias => sameResult(a.child) +case _ => this.semanticEquals(other) --- End diff -- can we also strip the alias of this here? so that we can mark `sameResult` as final. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22957: [SPARK-25951][SQL] Ignore aliases for distributio...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22957#discussion_r237069486 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala --- @@ -195,14 +195,26 @@ abstract class Expression extends TreeNode[Expression] { } /** - * Returns true when two expressions will always compute the same result, even if they differ + * Returns true when two expressions will always compute the same output, even if they differ * cosmetically (i.e. capitalization of names in attributes may be different). * * See [[Canonicalize]] for more details. */ def semanticEquals(other: Expression): Boolean = deterministic && other.deterministic && canonicalized == other.canonicalized + /** + * Returns true when two expressions will always compute the same result, even if the output may + * be different, because of different names or similar differences. + * Usually this means they their canonicalized form equals, but it may also not be the case, as + * different output expressions can evaluate to the same result as well (eg. when an expression + * is aliased). + */ + def sameResult(other: Expression): Boolean = other match { --- End diff -- can you put it in the method doc(both `semanticEquals` and `sameResult`)? This makes sense to me. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23153: [SPARK-26147][SQL] only pull out unevaluable python udf ...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23153 thanks, merging to master/2.4! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22957: [SPARK-25951][SQL] Ignore aliases for distributio...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22957#discussion_r237065506 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala --- @@ -195,14 +195,26 @@ abstract class Expression extends TreeNode[Expression] { } /** - * Returns true when two expressions will always compute the same result, even if they differ + * Returns true when two expressions will always compute the same output, even if they differ * cosmetically (i.e. capitalization of names in attributes may be different). * * See [[Canonicalize]] for more details. */ def semanticEquals(other: Expression): Boolean = deterministic && other.deterministic && canonicalized == other.canonicalized + /** + * Returns true when two expressions will always compute the same result, even if the output may + * be different, because of different names or similar differences. + * Usually this means they their canonicalized form equals, but it may also not be the case, as + * different output expressions can evaluate to the same result as well (eg. when an expression + * is aliased). + */ + def sameResult(other: Expression): Boolean = other match { --- End diff -- "erase the name" can also mean remove `Alias`. If we can't clearly tell the difference between `semanticEquals` and `sameResult`, and give a guideline about using which one in which case, I think we should just update `semanticEquals`(i.e. `Canonicalize`). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23130: [SPARK-26161][SQL] Ignore empty files in load
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23130#discussion_r237062653 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala --- @@ -142,4 +144,15 @@ class SaveLoadSuite extends DataSourceTest with SharedSQLContext with BeforeAndA assert(e.contains(s"Partition column `$unknown` not found in schema $schemaCatalog")) } } + + test("skip empty files in non bucketed read") { +withTempDir { dir => + val path = dir.getCanonicalPath + Files.write(Paths.get(path, "empty"), Array.empty[Byte]) + Files.write(Paths.get(path, "notEmpty"), "a".getBytes(StandardCharsets.UTF_8)) + val readback = spark.read.option("wholetext", true).text(path) + + assert(readback.rdd.getNumPartitions === 1) --- End diff -- do you mean `wholetext` mode will force to create one partition per file? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23083: [SPARK-26114][CORE] ExternalSorter's readingIterator fie...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23083 thanks, merging to master/2.4! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23128: [SPARK-26142][SQL] Implement shuffle read metrics in SQL
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23128 thanks , merging to master! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23151: [SPARK-26180][CORE][TEST] Add a withCreateTempDir...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23151#discussion_r237059126 --- Diff: core/src/test/scala/org/apache/spark/SparkFunSuite.scala --- @@ -105,5 +105,16 @@ abstract class SparkFunSuite logInfo(s"\n\n= FINISHED $shortSuiteName: '$testName' =\n") } } - + /** + * Creates a temporary directory, which is then passed to `f` and will be deleted after `f` + * returns. + * + * @todo Probably this method should be moved to a more general place + */ + protected def withCreateTempDir(f: File => Unit): Unit = { --- End diff -- if we have this function in `SparkFunSuite`, why do we need to define it again in `SQLTestUtils`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23151: [SPARK-26180][CORE][TEST] Add a withCreateTempDir...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23151#discussion_r237058872 --- Diff: core/src/test/scala/org/apache/spark/SparkFunSuite.scala --- @@ -105,5 +105,16 @@ abstract class SparkFunSuite logInfo(s"\n\n= FINISHED $shortSuiteName: '$testName' =\n") } } - + /** + * Creates a temporary directory, which is then passed to `f` and will be deleted after `f` + * returns. + * + * @todo Probably this method should be moved to a more general place --- End diff -- I think this is the most general place we can find... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23132: [SPARK-26163][SQL] Parsing decimals from JSON usi...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23132#discussion_r237058331 --- Diff: docs/sql-migration-guide-upgrade.md --- @@ -9,6 +9,8 @@ displayTitle: Spark SQL Upgrading Guide ## Upgrading From Spark SQL 2.4 to 3.0 + - In Spark version 2.4 and earlier, accepted format of decimals parsed from JSON is an optional sign ('+' or '-'), followed by a sequence of zero or more decimal digits, optionally followed by a fraction, optionally followed by an exponent. Any commas were removed from the input before parsing. Since Spark 3.0, format varies and depends on locale which can be set via JSON option `locale`. The default locale is `en-US`. To switch back to previous behavior, set `spark.sql.legacy.decimalParsing.enabled` to `true`. --- End diff -- I have the same question. Do we need the `DecimalFormat` when locale is `en-US`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22887: [SPARK-25880][CORE] user set's hadoop conf should not ov...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22887 Spark SQL SET command can't update any static config or Spark core configs, but I think hadoop configs are different. It's not static as users can update it via `SparkContext.hadoopConfiguration`. `SparkSession.SessionState.newHadoopConf()` is a mechanism to allow users to set hadoop config per-session in Spark SQL. So it's reasonble for users to expect that, if they set hadoop config via the SQL SET command, it should override the one in `spark-defaults.conf`. Looking back at `appendS3AndSparkHadoopConfigurations`, it has 2 parameters: spark conf and hadoop conf. The spark conf comes from `spark-defaults.conf` and any user provided configs when building the `SparkContext`. The user provided configs override `spark-defaults.conf`. The hadoop conf is either an empty config(if `appendS3AndSparkHadoopConfigurations` is called from `SparkHadoopUtil.newHadoopConfiguration`), or from `SparkSession.SessionState.newHadoopConf()`(if `appendS3AndSparkHadoopConfigurations` is called from `HadoopTableReader`). For the first case, nothing we need to worry about. For the second case, I think the hadoop config should take priority, as it contains the configs specified by users at rutime. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22957: [SPARK-25951][SQL] Ignore aliases for distributio...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22957#discussion_r237049166 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala --- @@ -195,14 +195,26 @@ abstract class Expression extends TreeNode[Expression] { } /** - * Returns true when two expressions will always compute the same result, even if they differ + * Returns true when two expressions will always compute the same output, even if they differ * cosmetically (i.e. capitalization of names in attributes may be different). * * See [[Canonicalize]] for more details. */ def semanticEquals(other: Expression): Boolean = deterministic && other.deterministic && canonicalized == other.canonicalized + /** + * Returns true when two expressions will always compute the same result, even if the output may + * be different, because of different names or similar differences. + * Usually this means they their canonicalized form equals, but it may also not be the case, as + * different output expressions can evaluate to the same result as well (eg. when an expression + * is aliased). + */ + def sameResult(other: Expression): Boolean = other match { --- End diff -- I know it's always safer to introduce a new API, does is it really necessary? In `Canonicalize`, we erase the name for attributes, I think it's reasonable to erase the name of `Alias`, as it doesn't affect the output. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23120: [SPARK-26151][SQL] Return partial results for bad...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23120#discussion_r237046616 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala --- @@ -33,26 +33,21 @@ class FailureSafeParser[IN]( private val corruptFieldIndex = schema.getFieldIndex(columnNameOfCorruptRecord) private val actualSchema = StructType(schema.filterNot(_.name == columnNameOfCorruptRecord)) private val resultRow = new GenericInternalRow(schema.length) - private val nullResult = new GenericInternalRow(schema.length) // This function takes 2 parameters: an optional partial result, and the bad record. If the given // schema doesn't contain a field for corrupted record, we just return the partial result or a // row with all fields null. If the given schema contains a field for corrupted record, we will // set the bad record to this field, and set other fields according to the partial result or null. private val toResultRow: (Option[InternalRow], () => UTF8String) => InternalRow = { -if (corruptFieldIndex.isDefined) { - (row, badRecord) => { -var i = 0 -while (i < actualSchema.length) { - val from = actualSchema(i) - resultRow(schema.fieldIndex(from.name)) = row.map(_.get(i, from.dataType)).orNull - i += 1 -} -resultRow(corruptFieldIndex.get) = badRecord() -resultRow +(row, badRecord) => { --- End diff -- without this change in `FailureSafeParser`, does JSON support returning partial result? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23120: [SPARK-26151][SQL] Return partial results for bad...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23120#discussion_r237046251 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala --- @@ -243,21 +243,27 @@ class UnivocityParser( () => getPartialResult(), new RuntimeException("Malformed CSV record")) } else { - try { -// When the length of the returned tokens is identical to the length of the parsed schema, -// we just need to convert the tokens that correspond to the required columns. -var i = 0 -while (i < requiredSchema.length) { + // When the length of the returned tokens is identical to the length of the parsed schema, + // we just need to convert the tokens that correspond to the required columns. + var badRecordException: Option[Throwable] = None + var i = 0 + while (i < requiredSchema.length) { --- End diff -- shall we stop parsing when we hit the first exception? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23130: [SPARK-26161][SQL] Ignore empty files in load
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23130#discussion_r237045706 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala --- @@ -142,4 +144,15 @@ class SaveLoadSuite extends DataSourceTest with SharedSQLContext with BeforeAndA assert(e.contains(s"Partition column `$unknown` not found in schema $schemaCatalog")) } } + + test("skip empty files in non bucketed read") { +withTempDir { dir => + val path = dir.getCanonicalPath + Files.write(Paths.get(path, "empty"), Array.empty[Byte]) + Files.write(Paths.get(path, "notEmpty"), "a".getBytes(StandardCharsets.UTF_8)) + val readback = spark.read.option("wholetext", true).text(path) + + assert(readback.rdd.getNumPartitions === 1) --- End diff -- does this test fail without your change? IIUC one partition can read multiple files. Is JSON the only data source that may return a row for empty file? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21004: [SPARK-23896][SQL]Improve PartitioningAwareFileIn...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21004#discussion_r237040743 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala --- @@ -126,35 +126,32 @@ abstract class PartitioningAwareFileIndex( val caseInsensitiveOptions = CaseInsensitiveMap(parameters) val timeZoneId = caseInsensitiveOptions.get(DateTimeUtils.TIMEZONE_OPTION) .getOrElse(sparkSession.sessionState.conf.sessionLocalTimeZone) - -userPartitionSchema match { +val inferredPartitionSpec = PartitioningUtils.parsePartitions( + leafDirs, + typeInference = sparkSession.sessionState.conf.partitionColumnTypeInferenceEnabled, --- End diff -- Before this patch, there was a subtle difference between with and without a user-provided partition schema: 1. with user-provided partition schema, we should not infer data types. We should infer as string and cast to user-provided type 2. without user-provided partition schema, we should infer the data type(with a config) So it was wrong to unify these 2 code paths. @gengliangwang can you change it back? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23052: [SPARK-26081][SQL] Prevent empty files for empty partiti...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23052 seems like a real failure --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23141: [SPARK-26021][SQL][followup] add test for special floati...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23141 thanks, merging to master! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23128: [SPARK-26142][SQL] Implement shuffle read metrics...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23128#discussion_r236976020 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala --- @@ -154,7 +156,10 @@ class ShuffledRowRDD( override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { val shuffledRowPartition = split.asInstanceOf[ShuffledRowRDDPartition] -val metrics = context.taskMetrics().createTempShuffleReadMetrics() +val tempMetrics = context.taskMetrics().createTempShuffleReadMetrics() +// Wrap the tempMetrics with SQLShuffleMetricsReporter here to support +// shuffle read metrics in SQL. --- End diff -- ``` // `SQLShuffleMetricsReporter` will update its own metrics for SQL exchange operator, // as well as the `tempMetrics` for basic shuffle metrics. ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23128: [SPARK-26142][SQL] Implement shuffle read metrics in SQL
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23128 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23153: [SPARK-26147][SQL] only pull out unevaluable python udf ...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23153 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23052: [SPARK-26081][SQL] Prevent empty files for empty partiti...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23052 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23083: [SPARK-26114][CORE] ExternalSorter's readingIterator fie...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23083 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23124#discussion_r236962499 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilder.scala --- @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.util + +import scala.collection.mutable + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.types._ + +/** + * A builder of [[ArrayBasedMapData]], which fails if a null map key is detected, and removes + * duplicated map keys w.r.t. the last wins policy. + */ +class ArrayBasedMapBuilder(keyType: DataType, valueType: DataType) extends Serializable { + assert(!keyType.existsRecursively(_.isInstanceOf[MapType]), "key of map cannot be/contain map") + assert(keyType != NullType, "map key cannot be null type.") + + private lazy val keyToIndex = keyType match { +case _: AtomicType | _: CalendarIntervalType => mutable.HashMap.empty[Any, Int] --- End diff -- I think for performance critical code path we should prefer java collection. thanks for pointing it out! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23052: [SPARK-26081][SQL] Prevent empty files for empty partiti...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23052 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23086: [SPARK-25528][SQL] data source v2 API refactor (b...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23086#discussion_r236957293 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/Table.java --- @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2; --- End diff -- It's unclear to me what would be the best choice: 1. move data source API to catalyst module 2. move data source related rules to SQL core module 3. define private catalog related APIs in catalyst module and implement them in SQL core Can we delay the discussion when we have a PR to add catalog support after the refactor? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23124#discussion_r236949897 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala --- @@ -558,8 +558,11 @@ private[parquet] class ParquetRowConverter( override def getConverter(fieldIndex: Int): Converter = keyValueConverter -override def end(): Unit = +override def end(): Unit = { + // The parquet map may contains null or duplicated map keys. When it happens, the behavior is + // undefined. --- End diff -- done --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23127: [SPARK-26159] Codegen for LocalTableScanExec and RDDScan...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23127 thanks, merging to master! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23083: [SPARK-26114][CORE] ExternalSorter's readingIterator fie...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23083 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23128: [SPARK-26142][SQL] Support passing shuffle metrics to ex...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23128 LGTM except one comment --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23128: [SPARK-26142][SQL] Support passing shuffle metric...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23128#discussion_r236637264 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala --- @@ -154,7 +156,14 @@ class ShuffledRowRDD( override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { val shuffledRowPartition = split.asInstanceOf[ShuffledRowRDDPartition] -val metrics = context.taskMetrics().createTempShuffleReadMetrics() +val tempMetrics = context.taskMetrics().createTempShuffleReadMetrics() +// metrics here could be empty cause user can use ShuffledRowRDD directly, --- End diff -- do you mean we may leave the `metrics` empty when creating `ShuffledRowRDD` in tests? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23128: [SPARK-26142][SQL] Support passing shuffle metric...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23128#discussion_r236636819 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala --- @@ -154,7 +156,14 @@ class ShuffledRowRDD( override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { val shuffledRowPartition = split.asInstanceOf[ShuffledRowRDDPartition] -val metrics = context.taskMetrics().createTempShuffleReadMetrics() +val tempMetrics = context.taskMetrics().createTempShuffleReadMetrics() +// metrics here could be empty cause user can use ShuffledRowRDD directly, --- End diff -- I don't think we need to consider this case since `ShuffledRowRDD` is a private API. If we do need to consider it, we also need to take care if users pass in a `metrics` that is invalid. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23138: [SPARK-23356][SQL][TEST] add new test cases for a + 1,a ...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23138 thanks, merging to master! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23083: [SPARK-26114][CORE] ExternalSorter's readingIterator fie...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23083 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22887: [SPARK-25880][CORE] user set's hadoop conf should not ov...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22887 > Basically, if my "core-size.xml" says "mapreduce.input.fileinputformat.split.maxsize" is 2, and my Spark conf says "spark.hadoop.mapreduce.input.fileinputformat.split.maxsize" is 3, then the value from the config generated by the method you're changing must be 3. I think this is what this PR tries to fix? the `hadoopConf` parameter of `appendS3AndSparkHadoopConfigurations` is either an empty one, or a one from `spark.SessionState.newHadoopConf()` which contains user-provided hadoop conf. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23104: [SPARK-26138][SQL] Cross join requires push Local...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23104#discussion_r236625590 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -459,6 +459,7 @@ object LimitPushDown extends Rule[LogicalPlan] { val newJoin = joinType match { case RightOuter => join.copy(right = maybePushLocalLimit(exp, right)) case LeftOuter => join.copy(left = maybePushLocalLimit(exp, left)) +case Cross => join.copy(left = maybePushLocalLimit(exp, left), right = maybePushLocalLimit(exp, right)) --- End diff -- if there is no join condition, I think join type doesn't matter and we can always push down limits. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23153: [SPARK-26147][SQL] only pull out unevaluable pyth...
GitHub user cloud-fan opened a pull request: https://github.com/apache/spark/pull/23153 [SPARK-26147][SQL] only pull out unevaluable python udf from join condition ## What changes were proposed in this pull request? https://github.com/apache/spark/pull/22326 made a mistake that, not all python UDFs are unevaluable in join condition. Only python UDFs that refer to attributes from both join side are unevaluable. This PR fixes this mistake. ## How was this patch tested? a new test You can merge this pull request into a Git repository by running: $ git pull https://github.com/cloud-fan/spark join Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/23153.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #23153 commit cb195cf4b08aef9f4beb3ed4c18580fe5a76c65c Author: Wenchen Fan Date: 2018-11-27T11:19:12Z only pull out unevaluable python udf from join condition --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23153: [SPARK-26147][SQL] only pull out unevaluable python udf ...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23153 @xuanyuanking @HyukjinKwon @gatorsmile --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23124: [SPARK-25829][SQL] remove duplicated map keys with last ...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23124 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23083: [SPARK-26114][CORE] ExternalSorter's readingIterator fie...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23083 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23128: [SPARK-26142][SQL] Support passing shuffle metrics to ex...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23128 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23083: [SPARK-26114][CORE] ExternalSorter's readingIterator fie...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23083 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23128: [SPARK-26142][SQL] Support passing shuffle metrics to ex...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23128 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23130: [SPARK-26161][SQL] Ignore empty files in load
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23130#discussion_r236515927 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala --- @@ -388,7 +388,7 @@ case class FileSourceScanExec( logInfo(s"Planning with ${bucketSpec.numBuckets} buckets") val filesGroupedToBuckets = selectedPartitions.flatMap { p => -p.files.map { f => +p.files.filter(_.getLen > 0).map { f => --- End diff -- yes, and the same change is also in `createNonBucketedReadRDD` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org