[GitHub] spark issue #21948: [SPARK-24991][SQL] use InternalRow in DataSourceWriter
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/21948 @cloud-fan, thanks for documenting the behavior and removing the default copy. I had a couple of questions, but I think it is close. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21948: [SPARK-24991][SQL] use InternalRow in DataSourceW...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21948#discussion_r207722363 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkV2Suite.scala --- @@ -44,16 +46,16 @@ class MemorySinkV2Suite extends StreamTest with BeforeAndAfter { val writer = new MemoryStreamWriter(sink, OutputMode.Append(), new StructType().add("i", "int")) writer.commit(0, Array( -MemoryWriterCommitMessage(0, Seq(InternalRow(1), InternalRow(2))), -MemoryWriterCommitMessage(1, Seq(InternalRow(3), InternalRow(4))), -MemoryWriterCommitMessage(2, Seq(InternalRow(6), InternalRow(7))) +MemoryWriterCommitMessage(0, Seq(Row(1), Row(2))), --- End diff -- Why was this changed back to Row? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21948: [SPARK-24991][SQL] use InternalRow in DataSourceW...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21948#discussion_r207722340 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala --- @@ -89,7 +89,8 @@ class RateStreamContinuousReader(options: DataSourceOptions) extends ContinuousR start.runTimeMs, i, numPartitions, -perPartitionRate): InputPartition[InternalRow] +perPartitionRate) +.asInstanceOf[InputPartition[InternalRow]] --- End diff -- Why is this cast necessary? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21978: SPARK-25006: Add CatalogTableIdentifier.
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/21978 FYI @jzhuge --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21977: SPARK-25004: Add spark.executor.pyspark.memory limit.
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/21977 @holdenk, I attempted to write a YARN unit test for this, but evidently the MiniYARNCluster doesn't run python workers. The task is run, but a worker is never started. If you have any idea how to fix this, I think we could have an easy test. Here's what I have so far: https://gist.github.com/rdblue/9848a00f49eaad6126fbbcfa1b039e19 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21977: SPARK-25004: Add spark.executor.pyspark.memory li...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21977#discussion_r207690569 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala --- @@ -81,6 +82,17 @@ case class AggregateInPandasExec( val bufferSize = inputRDD.conf.getInt("spark.buffer.size", 65536) val reuseWorker = inputRDD.conf.getBoolean("spark.python.worker.reuse", defaultValue = true) +val memoryMb = { --- End diff -- I went ahead with the refactor. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21977: SPARK-25004: Add spark.executor.pyspark.memory li...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21977#discussion_r207637946 --- Diff: python/pyspark/worker.py --- @@ -259,6 +260,26 @@ def main(infile, outfile): "PYSPARK_DRIVER_PYTHON are correctly set.") % ("%d.%d" % sys.version_info[:2], version)) +# set up memory limits +memory_limit_mb = int(os.environ.get('PYSPARK_EXECUTOR_MEMORY_MB', "-1")) +total_memory = resource.RLIMIT_AS +try: +(total_memory_limit, max_total_memory) = resource.getrlimit(total_memory) +msg = "Current mem: {0} of max {1}\n".format(total_memory_limit, max_total_memory) +sys.stderr.write() + +if memory_limit_mb > 0 and total_memory_limit < 0: --- End diff -- I've updated to use `resource.RLIM_INFINITY`. I think this should only set the resource limit if it isn't already set. It is unlikely that it's already set because this is during worker initialization, but the intent is to not cause harm if a higher-level system (i.e. container provider) has already set the limit. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21977: SPARK-25004: Add spark.executor.pyspark.memory li...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21977#discussion_r207636504 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala --- @@ -51,6 +52,17 @@ private[spark] class PythonRDD( val bufferSize = conf.getInt("spark.buffer.size", 65536) val reuseWorker = conf.getBoolean("spark.python.worker.reuse", true) + val memoryMb = { --- End diff -- I thought the comments below were clear: if a single worker is reused, it gets the entire allocation. If each core starts its own worker, each one gets an equal share. If `reuseWorker` is actually ignored, then this needs to be updated. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21977: SPARK-25004: Add spark.executor.pyspark.memory li...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21977#discussion_r207635841 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala --- @@ -81,6 +82,17 @@ case class AggregateInPandasExec( val bufferSize = inputRDD.conf.getInt("spark.buffer.size", 65536) val reuseWorker = inputRDD.conf.getBoolean("spark.python.worker.reuse", defaultValue = true) +val memoryMb = { --- End diff -- The other configuration options are already duplicated, so I was trying to make as few changes as possible. Since there are several duplicated options, I think it makes more sense to pass the SparkConf through to PythonRunner so it can extract its own configuration. @holdenk, would you like this refactor done in this PR, or should I do it in a follow-up? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21977: SPARK-25004: Add spark.executor.pyspark.memory limit.
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/21977 @ifilonenko, I opened follow-up SPARK-25021 for adding the PySpark memory allocation to Kubernetes. @mccheah, I opened follow-up SPARK-25022 for Mesos. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21977: SPARK-25004: Add spark.executor.pyspark.memory li...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21977#discussion_r207631295 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala --- @@ -333,7 +340,7 @@ private[spark] class Client( val maxMem = newAppResponse.getMaximumResourceCapability().getMemory() logInfo("Verifying our application has not requested more than the maximum " + s"memory capability of the cluster ($maxMem MB per container)") -val executorMem = executorMemory + executorMemoryOverhead +val executorMem = executorMemory + executorMemoryOverhead + pysparkWorkerMemory if (executorMem > maxMem) { throw new IllegalArgumentException(s"Required executor memory ($executorMemory" + s"+$executorMemoryOverhead MB) is above the max threshold ($maxMem MB) of this cluster! " + --- End diff -- I like having it broken out so users can see where their allocation is going. Otherwise, users that only know about `spark.executor.memory` might not know how their allocation is 1gb higher when running PySpark. I've updated this to include the worker memory. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21977: SPARK-25004: Add spark.executor.pyspark.memory li...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21977#discussion_r207630342 --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala --- @@ -114,6 +114,10 @@ package object config { .checkValue(_ >= 0, "The off-heap memory size must not be negative") .createWithDefault(0) + private[spark] val PYSPARK_EXECUTOR_MEMORY = ConfigBuilder("spark.executor.pyspark.memory") + .bytesConf(ByteUnit.MiB) + .createOptional --- End diff -- Fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21977: SPARK-25004: Add spark.executor.pyspark.memory li...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21977#discussion_r207629997 --- Diff: python/pyspark/worker.py --- @@ -259,6 +260,26 @@ def main(infile, outfile): "PYSPARK_DRIVER_PYTHON are correctly set.") % ("%d.%d" % sys.version_info[:2], version)) +# set up memory limits +memory_limit_mb = int(os.environ.get('PYSPARK_EXECUTOR_MEMORY_MB', "-1")) +total_memory = resource.RLIMIT_AS +try: +(total_memory_limit, max_total_memory) = resource.getrlimit(total_memory) +msg = "Current mem: {0} of max {1}\n".format(total_memory_limit, max_total_memory) +sys.stderr.write() --- End diff -- Fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21978: SPARK-25006: Add CatalogTableIdentifier.
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/21978 Retest this please. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21305#discussion_r207624290 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -2217,6 +2218,98 @@ class Analyzer( } } + /** + * Resolves columns of an output table from the data in a logical plan. This rule will: + * + * - Reorder columns when the write is by name + * - Insert safe casts when data types do not match + * - Insert aliases when column names do not match + * - Detect plans that are not compatible with the output table and throw AnalysisException + */ + object ResolveOutputRelation extends Rule[LogicalPlan] { +override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators { + case append @ AppendData(table, query, isByName) + if table.resolved && query.resolved && !append.resolved => +val projection = resolveOutputColumns(table.name, table.output, query, isByName) + +if (projection != query) { + append.copy(query = projection) +} else { + append +} +} + +def resolveOutputColumns( +tableName: String, +expected: Seq[Attribute], +query: LogicalPlan, +byName: Boolean): LogicalPlan = { + + if (expected.size < query.output.size) { +throw new AnalysisException( + s"""Cannot write to '$tableName', too many data columns: + |Table columns: ${expected.map(_.name).mkString(", ")} + |Data columns: ${query.output.map(_.name).mkString(", ")}""".stripMargin) + } + + val errors = new mutable.ArrayBuffer[String]() + val resolved: Seq[NamedExpression] = if (byName) { +expected.flatMap { tableAttr => + query.resolveQuoted(tableAttr.name, resolver) match { +case Some(queryExpr) => + checkField(tableAttr, queryExpr, err => errors += err) --- End diff -- I'd much rather pass functions than mutable state into other methods (side-effects). Plus, a function is cleaner because it doesn't require a a particular storage for the caller. If this were in a tight loop, there would be an argument for changing it but this only happens once for a plan. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21305#discussion_r207623469 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala --- @@ -352,6 +351,36 @@ case class Join( } } +/** + * Append data to an existing table. + */ +case class AppendData( +table: NamedRelation, +query: LogicalPlan, +isByName: Boolean) extends LogicalPlan { + override def children: Seq[LogicalPlan] = Seq(query) + override def output: Seq[Attribute] = Seq.empty + + override lazy val resolved: Boolean = { +query.output.size == table.output.size && query.output.zip(table.output).forall { + case (inAttr, outAttr) => + inAttr.name == outAttr.name &&// names must match + outAttr.dataType.sameType(inAttr.dataType) && // types must match --- End diff -- Good catch, I agree. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21305#discussion_r207623351 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala --- @@ -352,6 +351,36 @@ case class Join( } } +/** + * Append data to an existing table. + */ +case class AppendData( +table: NamedRelation, +query: LogicalPlan, +isByName: Boolean) extends LogicalPlan { + override def children: Seq[LogicalPlan] = Seq(query) + override def output: Seq[Attribute] = Seq.empty + + override lazy val resolved: Boolean = { +query.output.size == table.output.size && query.output.zip(table.output).forall { + case (inAttr, outAttr) => + inAttr.name == outAttr.name &&// names must match + outAttr.dataType.sameType(inAttr.dataType) && // types must match --- End diff -- Good catch. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21305#discussion_r207591742 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala --- @@ -336,4 +337,97 @@ object DataType { case (fromDataType, toDataType) => fromDataType == toDataType } } + + /** + * Returns true if the write data type can be read using the read data type. + * + * The write type is compatible with the read type if: + * - Both types are arrays, the array element types are compatible, and element nullability is + * compatible (read allows nulls or write does not contain nulls). + * - Both types are maps and the map key and value types are compatible, and value nullability + * is compatible (read allows nulls or write does not contain nulls). + * - Both types are structs and each field in the read struct is present in the write struct and + * compatible (including nullability), or is nullable if the write struct does not contain the + * field. Write-side structs are not compatible if they contain fields that are not present in + * the read-side struct. + * - Both types are atomic and the write type can be safely cast to the read type. + * + * Extra fields in write-side structs are not allowed to avoid accidentally writing data that + * the read schema will not read, and to ensure map key equality is not changed when data is read. + * + * @param write a write-side data type to validate against the read type + * @param read a read-side data type + * @return true if data written with the write type can be read using the read type + */ + def canWrite( + write: DataType, + read: DataType, + resolver: Resolver, + context: String, + addError: String => Unit = (_: String) => {}): Boolean = { +(write, read) match { + case (wArr: ArrayType, rArr: ArrayType) => +if (wArr.containsNull && !rArr.containsNull) { + addError(s"Cannot write nullable elements to array of non-nulls: '$context'") + false +} else { + canWrite(wArr.elementType, rArr.elementType, resolver, context + ".element", addError) +} + + case (wMap: MapType, rMap: MapType) => +// map keys cannot include data fields not in the read schema without changing equality when +// read. map keys can be missing fields as long as they are nullable in the read schema. +if (wMap.valueContainsNull && !rMap.valueContainsNull) { + addError(s"Cannot write nullable values to map of non-nulls: '$context'") + false +} else { + canWrite(wMap.keyType, rMap.keyType, resolver, context + ".key", addError) && + canWrite(wMap.valueType, rMap.valueType, resolver, context + ".value", addError) +} + + case (StructType(writeFields), StructType(readFields)) => +lazy val extraFields = writeFields.map(_.name).toSet -- readFields.map(_.name) + +var result = readFields.forall { readField => + val fieldContext = context + "." + readField.name + writeFields.find(writeField => resolver(writeField.name, readField.name)) match { --- End diff -- Yeah, I've been thinking about whether the rest of the code actually implements the rules that `canWrite` expects. This may not work, but I don't think it makes sense to add nested struct reordering if it isn't done already in this commit. Should we disable nested structures, or go ahead with this validation and fix reordering later? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21305#discussion_r207590794 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -2217,6 +2218,98 @@ class Analyzer( } } + /** + * Resolves columns of an output table from the data in a logical plan. This rule will: + * + * - Reorder columns when the write is by name + * - Insert safe casts when data types do not match + * - Insert aliases when column names do not match + * - Detect plans that are not compatible with the output table and throw AnalysisException --- End diff -- Agreed. I'm already working on a suite for `canWrite` and will be adding better tests shortly. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21305#discussion_r207395603 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala --- @@ -352,6 +351,36 @@ case class Join( } } +/** + * Append data to an existing table. + */ +case class AppendData( +table: NamedRelation, +query: LogicalPlan, +isByName: Boolean) extends LogicalPlan { + override def children: Seq[LogicalPlan] = Seq(query) --- End diff -- I also see that `InsertIntoTable` doesn't list the relation as a child. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21978: SPARK-25006: Add CatalogTableIdentifier.
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/21978 @gatorsmile and @cloud-fan, this adds catalog to `TableIdentifier` in preparation for multi-catalog support. `TableIdentifier` continues to work as-is to ensure that there are no behavior changes in code paths that do not have catalog support. I've updated `UnresolvedRelation` to demonstrate how migration to `CatalogTableIdentifier` will work. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21978: SPARK-25006: Add CatalogTableIdentifier.
GitHub user rdblue opened a pull request: https://github.com/apache/spark/pull/21978 SPARK-25006: Add CatalogTableIdentifier. ## What changes were proposed in this pull request? This adds `CatalogTableIdentifier`, which is an identifier that consists of a triple: catalog, database, and table. Catalog and database are optional. The existing `TableIdentifier` class extends `CatalogTableIdentifier` and is guarateed to have no defined catalog component. Classes that expect a `TableIdentifier` will continue to use `TableIdentifier` to ensure that catalogs are not leaked into code paths that do not support them. This adds a parser rule, `catalogTableIdentifier`, that can parse identifiers with a catalog. An identifier with only two components will match database and table, leaving the catalog undefined. Only identifiers with three components will have a defined catalog. In addition, rules must be re-written to support `catalogTableIdentifier`. Existing rules will continue to use `tableIdentifier` with no catalog. ## How was this patch tested? Existing tests. This should not change any behavior. You can merge this pull request into a Git repository by running: $ git pull https://github.com/rdblue/spark SPARK-25006-add-catalog-to-table-identifier Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21978.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21978 commit d61e0f1ccdce630d65c81ee27a233a14759415ea Author: Ryan Blue Date: 2018-08-02T21:03:21Z SPARK-25006: Add CatalogTableIdentifier. This adds CatalogTableIdentifier, which is an identifier that consists of a triple: catalog, database, and table. Catalog and database are optional. The existing TableIdentifier class extends CatalogTableIdentifier and is guarateed to have no defined catalog component. Classes that expect a TableIdentifier should continue to use TableIdentifier to ensure that catalogs are not leaked into code paths that do not support them. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/21305 Retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21977: SPARK-25004: Add spark.executor.pyspark.memory limit.
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/21977 @holdenk, can you help review this since it is related to PySpark? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21977: SPARK-25004: Add spark.executor.pyspark.memory li...
GitHub user rdblue opened a pull request: https://github.com/apache/spark/pull/21977 SPARK-25004: Add spark.executor.pyspark.memory limit. ## What changes were proposed in this pull request? This adds `spark.executor.pyspark.memory` to configure Python's address space limit, [`resource.RLIMIT_AS`](https://docs.python.org/3/library/resource.html#resource.RLIMIT_AS). Limiting Python's address space allows Python to participate in memory management. In practice, we see fewer cases of Python taking too much memory because it doesn't know to run garbage collection. This results in YARN killing fewer containers. This also improves error messages so users know that Python is consuming too much memory: ``` File "build/bdist.linux-x86_64/egg/package/library.py", line 265, in fe_engineer fe_eval_rec.update(f(src_rec_prep, mat_rec_prep)) File "build/bdist.linux-x86_64/egg/package/library.py", line 163, in fe_comp comparisons = EvaluationUtils.leven_list_compare(src_rec_prep.get(item, []), mat_rec_prep.get(item, [])) File "build/bdist.linux-x86_64/egg/package/evaluationutils.py", line 25, in leven_list_compare permutations = sorted(permutations, reverse=True) MemoryError ``` The new pyspark memory setting is used to increase requested YARN container memory, instead of sharing overhead memory between python and off-heap JVM activity. ## How was this patch tested? Tested memory limits in our YARN cluster and verified that MemoryError is thrown. You can merge this pull request into a Git repository by running: $ git pull https://github.com/rdblue/spark SPARK-25004-add-python-memory-limit Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21977.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21977 commit 19cd9c5cce4420729074a0976b129889d70fd56c Author: Ryan Blue Date: 2018-05-09T18:34:50Z SPARK-25004: Add spark.executor.pyspark.memory limit. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21948: [SPARK-24991][SQL] use InternalRow in DataSourceWriter
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/21948 I'm changing my +1 to -1 because read-side changes are mixed in and because copies are the responsibility of data sources if they buffer and hold references to earlier rows. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21948: [SPARK-24991][SQL] use InternalRow in DataSourceW...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21948#discussion_r207295461 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala --- @@ -89,8 +89,7 @@ class RateStreamContinuousReader(options: DataSourceOptions) extends ContinuousR start.runTimeMs, i, numPartitions, -perPartitionRate) -.asInstanceOf[InputPartition[InternalRow]] +perPartitionRate): InputPartition[InternalRow] --- End diff -- This should be in a separate commit. I didn't notice yesterday that this is for the writer until it was linked from the other issue. I think this change needs to get in, but it should not be mixed into changes for the write path. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21948: [SPARK-24991][SQL] use InternalRow in DataSourceW...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21948#discussion_r207294283 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriterFactory.java --- @@ -50,4 +50,15 @@ *this ID will always be 0. */ DataWriter createDataWriter(int partitionId, long taskId, long epochId); + + /** + * When true, Spark will reuse the same data object instance when sending data to the data writer, + * for better performance. Data writers should carefully handle the data objects if it's reused, + * e.g. do not buffer the data objects in a list. By default it returns false for safety, data + * sources can override it if their data writers immediately write the data object to somewhere + * else like a memory buffer or disk. + */ + default boolean reuseDataObject() { --- End diff -- I don't think this should be added in this commit. This is to move to `InternalRow` and should not alter the API. I'm fine documenting this, but writers are responsible for defensive copies if necessary. This default is going to cause sources to be slower and I don't think it is necessary for implementations that aren't tests buffering data in memory. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21948: [SPARK-24991][SQL] use InternalRow in DataSourceW...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21948#discussion_r207293465 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriterFactory.java --- @@ -50,4 +50,15 @@ *this ID will always be 0. */ DataWriter createDataWriter(int partitionId, long taskId, long epochId); + + /** + * When true, Spark will reuse the same data object instance when sending data to the data writer, + * for better performance. Data writers should carefully handle the data objects if it's reused, + * e.g. do not buffer the data objects in a list. By default it returns false for safety, data --- End diff -- No, Iceberg assumes that data objects are reused. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21911: [SPARK-24940][SQL] Coalesce and Repartition Hint for SQL...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/21911 I'd like to fix the AnalysisError message and I noted one small nit in the tests. +1 when the AnalysisError message is fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21911: [SPARK-24940][SQL] Coalesce and Repartition Hint ...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21911#discussion_r207285870 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveHintsSuite.scala --- @@ -17,15 +17,25 @@ package org.apache.spark.sql.catalyst.analysis +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.expressions.Literal import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.catalyst.plans.Inner import org.apache.spark.sql.catalyst.plans.logical._ class ResolveHintsSuite extends AnalysisTest { import org.apache.spark.sql.catalyst.analysis.TestRelations._ + private def intercept(plan: LogicalPlan, messages: String*): Unit = { --- End diff -- Since this is an assertion and `intercept` is a well-known method, can you rename it to `assertErrorMessage` or something more descriptive? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21911: [SPARK-24940][SQL] Coalesce and Repartition Hint ...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21911#discussion_r207285266 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala --- @@ -102,6 +104,35 @@ object ResolveHints { } } + /** + * COALESCE Hint accepts name "COALESCE" and "REPARTITION". + * Its parameter includes a partition number. + */ + class ResolveCoalesceHints(conf: SQLConf) extends Rule[LogicalPlan] { +private val COALESCE_HINT_NAMES = Set("COALESCE", "REPARTITION") + +private def applyCoalesceHint( + plan: LogicalPlan, + numPartitions: Int, + shuffle: Boolean): LogicalPlan = { + Repartition(numPartitions, shuffle, plan) +} + +def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators { + case h: UnresolvedHint if COALESCE_HINT_NAMES.contains(h.name.toUpperCase(Locale.ROOT)) => +h.parameters match { + case Seq(Literal(numPartitions: Int, IntegerType)) => +val shuffle = h.name.toUpperCase(Locale.ROOT) match { + case "REPARTITION" => true + case "COALESCE" => false +} +applyCoalesceHint(h.child, numPartitions, shuffle) + case _ => +throw new AnalysisException("COALESCE Hint expects a partition number as parameter") --- End diff -- Can you use `h.name.toUpperCase` in this error message instead? I think that would be a better message for users that don't know the relationship between COALESCE and REPARTITION. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/21305 @cloud-fan, I'll fix the conflicts and re-run tests. Yesterday's tests passed after I updated for your feedback. I'd like to try to get this in soon because it is taking so much time to resolve conflicts without any real changes. FYI @gatorsmile, @bersprockets, @jzhuge --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21946: [SPARK-24990][SQL] merge ReadSupport and ReadSupportWith...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/21946 +1 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21305#discussion_r207043490 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -2217,6 +2218,100 @@ class Analyzer( } } + /** + * Resolves columns of an output table from the data in a logical plan. This rule will: + * + * - Reorder columns when the write is by name + * - Insert safe casts when data types do not match + * - Insert aliases when column names do not match + * - Detect plans that are not compatible with the output table and throw AnalysisException + */ + object ResolveOutputRelation extends Rule[LogicalPlan] { +override def apply(plan: LogicalPlan): LogicalPlan = plan transform { + case append @ AppendData(table, query, isByName) + if table.resolved && query.resolved && !append.resolved => +val projection = resolveOutputColumns(table.name, table.output, query, isByName) + +if (projection != query) { + append.copy(query = projection) +} else { + append +} +} + +def resolveOutputColumns( +tableName: String, +expected: Seq[Attribute], +query: LogicalPlan, +byName: Boolean): LogicalPlan = { + + if (expected.size < query.output.size) { +throw new AnalysisException( + s"""Cannot write to '$tableName', too many data columns: + |Table columns: ${expected.map(_.name).mkString(", ")} + |Data columns: ${query.output.map(_.name).mkString(", ")}""".stripMargin) + } + + val errors = new mutable.ArrayBuffer[String]() + val resolved: Seq[NamedExpression] = if (byName) { +expected.flatMap { outAttr => + query.resolveQuoted(outAttr.name, resolver) match { +case Some(inAttr) if inAttr.nullable && !outAttr.nullable => + errors += s"Cannot write nullable values to non-null column '${outAttr.name}'" + None + +case Some(inAttr) if !DataType.canWrite(outAttr.dataType, inAttr.dataType, resolver) => + Some(upcast(inAttr, outAttr)) + +case Some(inAttr) => + Some(inAttr) // matches nullability, datatype, and name + +case _ => + errors += s"Cannot find data for output column '${outAttr.name}'" + None + } +} + + } else { +if (expected.size > query.output.size) { + throw new AnalysisException( +s"""Cannot write to '$tableName', not enough data columns: + |Table columns: ${expected.map(_.name).mkString(", ")} + |Data columns: ${query.output.map(_.name).mkString(", ")}""".stripMargin) +} + +query.output.zip(expected).flatMap { + case (inAttr, outAttr) if inAttr.nullable && !outAttr.nullable => +errors += s"Cannot write nullable values to non-null column '${outAttr.name}'" +None + + case (inAttr, outAttr) +if !DataType.canWrite(inAttr.dataType, outAttr.dataType, resolver) || --- End diff -- I updated this to use your suggestion: now it always adds the cast. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21305#discussion_r207043344 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -2217,6 +2218,100 @@ class Analyzer( } } + /** + * Resolves columns of an output table from the data in a logical plan. This rule will: + * + * - Reorder columns when the write is by name + * - Insert safe casts when data types do not match + * - Insert aliases when column names do not match + * - Detect plans that are not compatible with the output table and throw AnalysisException + */ + object ResolveOutputRelation extends Rule[LogicalPlan] { +override def apply(plan: LogicalPlan): LogicalPlan = plan transform { + case append @ AppendData(table, query, isByName) + if table.resolved && query.resolved && !append.resolved => +val projection = resolveOutputColumns(table.name, table.output, query, isByName) + +if (projection != query) { + append.copy(query = projection) +} else { + append +} +} + +def resolveOutputColumns( +tableName: String, +expected: Seq[Attribute], +query: LogicalPlan, +byName: Boolean): LogicalPlan = { + + if (expected.size < query.output.size) { +throw new AnalysisException( + s"""Cannot write to '$tableName', too many data columns: + |Table columns: ${expected.map(_.name).mkString(", ")} + |Data columns: ${query.output.map(_.name).mkString(", ")}""".stripMargin) + } + + val errors = new mutable.ArrayBuffer[String]() + val resolved: Seq[NamedExpression] = if (byName) { +expected.flatMap { outAttr => + query.resolveQuoted(outAttr.name, resolver) match { +case Some(inAttr) if inAttr.nullable && !outAttr.nullable => + errors += s"Cannot write nullable values to non-null column '${outAttr.name}'" + None + +case Some(inAttr) if !DataType.canWrite(outAttr.dataType, inAttr.dataType, resolver) => + Some(upcast(inAttr, outAttr)) + +case Some(inAttr) => + Some(inAttr) // matches nullability, datatype, and name + +case _ => + errors += s"Cannot find data for output column '${outAttr.name}'" + None + } +} + + } else { +if (expected.size > query.output.size) { + throw new AnalysisException( +s"""Cannot write to '$tableName', not enough data columns: + |Table columns: ${expected.map(_.name).mkString(", ")} + |Data columns: ${query.output.map(_.name).mkString(", ")}""".stripMargin) +} + +query.output.zip(expected).flatMap { --- End diff -- I refactored these into a helper method. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21305#discussion_r207043203 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -2217,6 +2218,100 @@ class Analyzer( } } + /** + * Resolves columns of an output table from the data in a logical plan. This rule will: + * + * - Reorder columns when the write is by name + * - Insert safe casts when data types do not match + * - Insert aliases when column names do not match + * - Detect plans that are not compatible with the output table and throw AnalysisException + */ + object ResolveOutputRelation extends Rule[LogicalPlan] { +override def apply(plan: LogicalPlan): LogicalPlan = plan transform { + case append @ AppendData(table, query, isByName) + if table.resolved && query.resolved && !append.resolved => +val projection = resolveOutputColumns(table.name, table.output, query, isByName) + +if (projection != query) { + append.copy(query = projection) +} else { + append +} +} + +def resolveOutputColumns( +tableName: String, +expected: Seq[Attribute], +query: LogicalPlan, +byName: Boolean): LogicalPlan = { + + if (expected.size < query.output.size) { +throw new AnalysisException( + s"""Cannot write to '$tableName', too many data columns: + |Table columns: ${expected.map(_.name).mkString(", ")} + |Data columns: ${query.output.map(_.name).mkString(", ")}""".stripMargin) + } + + val errors = new mutable.ArrayBuffer[String]() + val resolved: Seq[NamedExpression] = if (byName) { +expected.flatMap { outAttr => + query.resolveQuoted(outAttr.name, resolver) match { +case Some(inAttr) if inAttr.nullable && !outAttr.nullable => --- End diff -- I fixed this by always failing if `canWrite` returns false and always adding the `UpCast`. Now, `canWrite` will return true if the write type can be cast to the read type for atomic types, as determined by `Cast.canSafeCast`. Since it only returns a boolean, we always insert the cast and the optimizer should remove it if it isn't needed. I also added better error messages. When an error is found, the check will add a clear error message by calling `addError: String => Unit`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21305#discussion_r207015951 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -2217,6 +2218,100 @@ class Analyzer( } } + /** + * Resolves columns of an output table from the data in a logical plan. This rule will: + * + * - Reorder columns when the write is by name + * - Insert safe casts when data types do not match + * - Insert aliases when column names do not match + * - Detect plans that are not compatible with the output table and throw AnalysisException + */ + object ResolveOutputRelation extends Rule[LogicalPlan] { +override def apply(plan: LogicalPlan): LogicalPlan = plan transform { + case append @ AppendData(table, query, isByName) + if table.resolved && query.resolved && !append.resolved => +val projection = resolveOutputColumns(table.name, table.output, query, isByName) + +if (projection != query) { + append.copy(query = projection) +} else { + append +} +} + +def resolveOutputColumns( +tableName: String, +expected: Seq[Attribute], +query: LogicalPlan, +byName: Boolean): LogicalPlan = { + + if (expected.size < query.output.size) { +throw new AnalysisException( + s"""Cannot write to '$tableName', too many data columns: + |Table columns: ${expected.map(_.name).mkString(", ")} + |Data columns: ${query.output.map(_.name).mkString(", ")}""".stripMargin) + } + + val errors = new mutable.ArrayBuffer[String]() + val resolved: Seq[NamedExpression] = if (byName) { +expected.flatMap { outAttr => + query.resolveQuoted(outAttr.name, resolver) match { +case Some(inAttr) if inAttr.nullable && !outAttr.nullable => + errors += s"Cannot write nullable values to non-null column '${outAttr.name}'" + None + +case Some(inAttr) if !DataType.canWrite(outAttr.dataType, inAttr.dataType, resolver) => + Some(upcast(inAttr, outAttr)) + +case Some(inAttr) => + Some(inAttr) // matches nullability, datatype, and name + +case _ => + errors += s"Cannot find data for output column '${outAttr.name}'" + None + } +} + + } else { +if (expected.size > query.output.size) { + throw new AnalysisException( +s"""Cannot write to '$tableName', not enough data columns: + |Table columns: ${expected.map(_.name).mkString(", ")} + |Data columns: ${query.output.map(_.name).mkString(", ")}""".stripMargin) +} + +query.output.zip(expected).flatMap { + case (inAttr, outAttr) if inAttr.nullable && !outAttr.nullable => +errors += s"Cannot write nullable values to non-null column '${outAttr.name}'" +None + + case (inAttr, outAttr) +if !DataType.canWrite(inAttr.dataType, outAttr.dataType, resolver) || --- End diff -- Sorry, I thought you were asking whether upcast is necessary. We could probably always upcast. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21921: [SPARK-24971][SQL] remove SupportsDeprecatedScanRow
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/21921 Yeah, I'd say that if it isn't documented then lets go with the usually RTC conventions. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21948: [SPARK-24991][SQL] use InternalRow in DataSourceWriter
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/21948 +1 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21948: [SPARK-24991][SQL] use InternalRow in DataSourceW...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21948#discussion_r207006871 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala --- @@ -89,8 +89,7 @@ class RateStreamContinuousReader(options: DataSourceOptions) extends ContinuousR start.runTimeMs, i, numPartitions, -perPartitionRate) -.asInstanceOf[InputPartition[InternalRow]] +perPartitionRate): InputPartition[InternalRow] --- End diff -- Sounds good to me. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21946: [SPARK-24990][SQL] merge ReadSupport and ReadSupportWith...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/21946 Yeah, I'm fine with this, then. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21305#discussion_r206979097 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/WriteSupport.java --- @@ -38,15 +38,16 @@ * If this method fails (by throwing an exception), the action will fail and no Spark job will be * submitted. * - * @param jobId A unique string for the writing job. It's possible that there are many writing - * jobs running at the same time, and the returned {@link DataSourceWriter} can - * use this job id to distinguish itself from other jobs. + * @param writeUUID A unique string for the writing job. It's possible that there are many writing + * jobs running at the same time, and the returned {@link DataSourceWriter} can + * use this job id to distinguish itself from other jobs. * @param schema the schema of the data to be written. * @param mode the save mode which determines what to do when the data are already in this data * source, please refer to {@link SaveMode} for more details. * @param options the options for the returned data source writer, which is an immutable *case-insensitive string-to-string map. + * @return a writer to append data to this data source --- End diff -- The data source API only handles writes/appends and reads. The high-level logical combine append writes with other operations. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21305#discussion_r206978690 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/WriteSupport.java --- @@ -38,15 +38,16 @@ * If this method fails (by throwing an exception), the action will fail and no Spark job will be * submitted. * - * @param jobId A unique string for the writing job. It's possible that there are many writing - * jobs running at the same time, and the returned {@link DataSourceWriter} can - * use this job id to distinguish itself from other jobs. + * @param writeUUID A unique string for the writing job. It's possible that there are many writing --- End diff -- This is removed in the v2 API redesign. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21305#discussion_r206978506 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala --- @@ -352,6 +351,36 @@ case class Join( } } +/** + * Append data to an existing table. + */ +case class AppendData( +table: NamedRelation, +query: LogicalPlan, +isByName: Boolean) extends LogicalPlan { + override def children: Seq[LogicalPlan] = Seq(query) --- End diff -- What transforms would be enabled by making `table` a child? Do we want to transform the relation? It is fixed so I didn't think that was a good idea. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21305#discussion_r206978261 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -2217,6 +2218,100 @@ class Analyzer( } } + /** + * Resolves columns of an output table from the data in a logical plan. This rule will: + * + * - Reorder columns when the write is by name + * - Insert safe casts when data types do not match + * - Insert aliases when column names do not match + * - Detect plans that are not compatible with the output table and throw AnalysisException + */ + object ResolveOutputRelation extends Rule[LogicalPlan] { +override def apply(plan: LogicalPlan): LogicalPlan = plan transform { + case append @ AppendData(table, query, isByName) + if table.resolved && query.resolved && !append.resolved => +val projection = resolveOutputColumns(table.name, table.output, query, isByName) + +if (projection != query) { + append.copy(query = projection) +} else { + append +} +} + +def resolveOutputColumns( +tableName: String, +expected: Seq[Attribute], +query: LogicalPlan, +byName: Boolean): LogicalPlan = { + + if (expected.size < query.output.size) { +throw new AnalysisException( + s"""Cannot write to '$tableName', too many data columns: + |Table columns: ${expected.map(_.name).mkString(", ")} + |Data columns: ${query.output.map(_.name).mkString(", ")}""".stripMargin) + } + + val errors = new mutable.ArrayBuffer[String]() + val resolved: Seq[NamedExpression] = if (byName) { +expected.flatMap { outAttr => + query.resolveQuoted(outAttr.name, resolver) match { +case Some(inAttr) if inAttr.nullable && !outAttr.nullable => + errors += s"Cannot write nullable values to non-null column '${outAttr.name}'" + None + +case Some(inAttr) if !DataType.canWrite(outAttr.dataType, inAttr.dataType, resolver) => + Some(upcast(inAttr, outAttr)) + +case Some(inAttr) => + Some(inAttr) // matches nullability, datatype, and name + +case _ => + errors += s"Cannot find data for output column '${outAttr.name}'" + None + } +} + + } else { +if (expected.size > query.output.size) { + throw new AnalysisException( +s"""Cannot write to '$tableName', not enough data columns: + |Table columns: ${expected.map(_.name).mkString(", ")} + |Data columns: ${query.output.map(_.name).mkString(", ")}""".stripMargin) +} + +query.output.zip(expected).flatMap { + case (inAttr, outAttr) if inAttr.nullable && !outAttr.nullable => +errors += s"Cannot write nullable values to non-null column '${outAttr.name}'" +None + + case (inAttr, outAttr) +if !DataType.canWrite(inAttr.dataType, outAttr.dataType, resolver) || --- End diff -- The contract of `DataType.canWrite` is that the data written is compatible with the table's read schema. It should allow promotion from `int` to `long` and `float` to `double` and then use upcast here to write the correct type. I think this is a problem with `canWrite`, not the upcast. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21305#discussion_r206977289 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -2217,6 +2218,100 @@ class Analyzer( } } + /** + * Resolves columns of an output table from the data in a logical plan. This rule will: + * + * - Reorder columns when the write is by name + * - Insert safe casts when data types do not match + * - Insert aliases when column names do not match + * - Detect plans that are not compatible with the output table and throw AnalysisException + */ + object ResolveOutputRelation extends Rule[LogicalPlan] { +override def apply(plan: LogicalPlan): LogicalPlan = plan transform { + case append @ AppendData(table, query, isByName) + if table.resolved && query.resolved && !append.resolved => +val projection = resolveOutputColumns(table.name, table.output, query, isByName) + +if (projection != query) { + append.copy(query = projection) +} else { + append +} +} + +def resolveOutputColumns( +tableName: String, +expected: Seq[Attribute], +query: LogicalPlan, +byName: Boolean): LogicalPlan = { + + if (expected.size < query.output.size) { +throw new AnalysisException( + s"""Cannot write to '$tableName', too many data columns: + |Table columns: ${expected.map(_.name).mkString(", ")} + |Data columns: ${query.output.map(_.name).mkString(", ")}""".stripMargin) + } + + val errors = new mutable.ArrayBuffer[String]() + val resolved: Seq[NamedExpression] = if (byName) { +expected.flatMap { outAttr => + query.resolveQuoted(outAttr.name, resolver) match { +case Some(inAttr) if inAttr.nullable && !outAttr.nullable => + errors += s"Cannot write nullable values to non-null column '${outAttr.name}'" + None + +case Some(inAttr) if !DataType.canWrite(outAttr.dataType, inAttr.dataType, resolver) => + Some(upcast(inAttr, outAttr)) + +case Some(inAttr) => + Some(inAttr) // matches nullability, datatype, and name + +case _ => + errors += s"Cannot find data for output column '${outAttr.name}'" + None + } +} + + } else { +if (expected.size > query.output.size) { + throw new AnalysisException( +s"""Cannot write to '$tableName', not enough data columns: + |Table columns: ${expected.map(_.name).mkString(", ")} + |Data columns: ${query.output.map(_.name).mkString(", ")}""".stripMargin) +} + +query.output.zip(expected).flatMap { --- End diff -- This handles both append cases, write by name and write by position. This block is checking by position. I'll see if I can refactor the checks into a private method. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21305#discussion_r206976856 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -2217,6 +2218,100 @@ class Analyzer( } } + /** + * Resolves columns of an output table from the data in a logical plan. This rule will: + * + * - Reorder columns when the write is by name + * - Insert safe casts when data types do not match + * - Insert aliases when column names do not match + * - Detect plans that are not compatible with the output table and throw AnalysisException + */ + object ResolveOutputRelation extends Rule[LogicalPlan] { +override def apply(plan: LogicalPlan): LogicalPlan = plan transform { + case append @ AppendData(table, query, isByName) + if table.resolved && query.resolved && !append.resolved => +val projection = resolveOutputColumns(table.name, table.output, query, isByName) + +if (projection != query) { + append.copy(query = projection) +} else { + append +} +} + +def resolveOutputColumns( +tableName: String, +expected: Seq[Attribute], +query: LogicalPlan, +byName: Boolean): LogicalPlan = { + + if (expected.size < query.output.size) { +throw new AnalysisException( + s"""Cannot write to '$tableName', too many data columns: + |Table columns: ${expected.map(_.name).mkString(", ")} + |Data columns: ${query.output.map(_.name).mkString(", ")}""".stripMargin) + } + + val errors = new mutable.ArrayBuffer[String]() + val resolved: Seq[NamedExpression] = if (byName) { +expected.flatMap { outAttr => + query.resolveQuoted(outAttr.name, resolver) match { +case Some(inAttr) if inAttr.nullable && !outAttr.nullable => --- End diff -- This is done by `DataType.canWrite` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21946: [SPARK-24990][SQL] merge ReadSupport and ReadSupportWith...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/21946 @cloud-fan, from your comment around the same time as mine, it sounds like the confusion may just be in how you're updating the current API to the proposed one. Can you post a migration plan? It sounds like something like this: `ReadSupport` and `ReadSupportWithSchema` -> `BatchReadSupportProvider` `DataSourceReader` -> `ReadSupport` Is that right? The re-use of `ReadSupport` would explain the confusion on my end. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21946: [SPARK-24990][SQL] merge ReadSupport and ReadSupportWith...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/21946 Isn't this unnecessary after the API redesign? For the redesign, the `DataSourceV2` or a `ReadSupportProvider` will supply a `create` method (or `anonymousTable`) to return a `Table` that implements `ReadSupport`. `ReadSupport` should not accept user schemas because the schema should be accessible from the `Table` itself. That way, we can use the same table-based relation (see https://github.com/apache/spark/pull/21877/files#diff-35ba4ffb5ccb9b18b43226f1d5effa23R82). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21921: [SPARK-24971][SQL] remove SupportsDeprecatedScanRow
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/21921 @cloud-fan, @gatorsmile, I'm fine with that if it's documented somewhere. I wasn't aware of that convention and no one brought it up the last time I pointed out commits without a committer +1. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21921: [SPARK-24971][SQL] remove SupportsDeprecatedScanRow
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/21921 This looks fine other than the possibly unnecessary cast. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21921: [SPARK-24971][SQL] remove SupportsDeprecatedScanR...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21921#discussion_r206946453 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala --- @@ -416,12 +405,12 @@ class AdvancedDataSourceV2 extends DataSourceV2 with ReadSupport { requiredSchema } -override def planRowInputPartitions(): JList[InputPartition[Row]] = { - val lowerBound = filters.collect { +override def planInputPartitions(): JList[InputPartition[InternalRow]] = { + val lowerBound = filters.collectFirst { --- End diff -- Fine by me since it is so small, just wanted to point it out. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21921: [SPARK-24971][SQL] remove SupportsDeprecatedScanR...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21921#discussion_r206946335 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala --- @@ -121,17 +121,6 @@ class DataSourceV2Suite extends QueryTest with SharedSQLContext { } } - test("unsafe row scan implementation") { -Seq(classOf[UnsafeRowDataSourceV2], classOf[JavaUnsafeRowDataSourceV2]).foreach { cls => --- End diff -- Ok. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21921: [SPARK-24971][SQL] remove SupportsDeprecatedScanR...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21921#discussion_r206946259 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamMicroBatchReader.scala --- @@ -169,7 +170,7 @@ class RateStreamMicroBatchReader(options: DataSourceOptions, checkpointLocation: (0 until numPartitions).map { p => new RateStreamMicroBatchInputPartition( p, numPartitions, rangeStart, rangeEnd, localStartTimeMs, relativeMsPerValue) -: InputPartition[Row] +: InputPartition[InternalRow] --- End diff -- This is fine since it isn't a cast, but it's generally better to check whether these are still necessary after refactoring. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21921: [SPARK-24971][SQL] remove SupportsDeprecatedScanR...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21921#discussion_r206946076 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala --- @@ -91,7 +90,7 @@ class RateStreamContinuousReader(options: DataSourceOptions) i, numPartitions, perPartitionRate) -.asInstanceOf[InputPartition[Row]] +.asInstanceOf[InputPartition[InternalRow]] --- End diff -- I don't think it's a good idea to leave casts. Can you check to see if this can be avoided? I found in #21118 that many of the casts were unnecessary if variables had declared types and it is much better to avoid explicit casts that work around the type system. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21921: [SPARK-24971][SQL] remove SupportsDeprecatedScanR...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21921#discussion_r206936701 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala --- @@ -456,57 +445,20 @@ class AdvancedInputPartition(start: Int, end: Int, requiredSchema: StructType) current < end } - override def get(): Row = { + override def get(): InternalRow = { val values = requiredSchema.map(_.name).map { case "i" => current case "j" => -current } -Row.fromSeq(values) +InternalRow.fromSeq(values) } } -class UnsafeRowDataSourceV2 extends DataSourceV2 with ReadSupport { --- End diff -- These aren't Row implementations. Why remove them? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21921: [SPARK-24971][SQL] remove SupportsDeprecatedScanR...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21921#discussion_r206936422 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala --- @@ -416,12 +405,12 @@ class AdvancedDataSourceV2 extends DataSourceV2 with ReadSupport { requiredSchema } -override def planRowInputPartitions(): JList[InputPartition[Row]] = { - val lowerBound = filters.collect { +override def planInputPartitions(): JList[InputPartition[InternalRow]] = { + val lowerBound = filters.collectFirst { --- End diff -- Nit: this is an unrelated change. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21921: [SPARK-24971][SQL] remove SupportsDeprecatedScanR...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21921#discussion_r206936202 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala --- @@ -121,17 +121,6 @@ class DataSourceV2Suite extends QueryTest with SharedSQLContext { } } - test("unsafe row scan implementation") { -Seq(classOf[UnsafeRowDataSourceV2], classOf[JavaUnsafeRowDataSourceV2]).foreach { cls => --- End diff -- Why remove unsafe tests? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21921: [SPARK-24971][SQL] remove SupportsDeprecatedScanR...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21921#discussion_r206935616 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamMicroBatchReader.scala --- @@ -169,7 +170,7 @@ class RateStreamMicroBatchReader(options: DataSourceOptions, checkpointLocation: (0 until numPartitions).map { p => new RateStreamMicroBatchInputPartition( p, numPartitions, rangeStart, rangeEnd, localStartTimeMs, relativeMsPerValue) -: InputPartition[Row] +: InputPartition[InternalRow] --- End diff -- Is this needed? Doesn't RateStreamMicroBatchInputPartition implement InputPartition[InternalRow]? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21921: [SPARK-24971][SQL] remove SupportsDeprecatedScanR...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21921#discussion_r206935176 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala --- @@ -91,7 +90,7 @@ class RateStreamContinuousReader(options: DataSourceOptions) i, numPartitions, perPartitionRate) -.asInstanceOf[InputPartition[Row]] +.asInstanceOf[InputPartition[InternalRow]] --- End diff -- Why is this cast necessary? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21921: [SPARK-24971][SQL] remove SupportsDeprecatedScanRow
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/21921 @cloud-fan, I thought it was a requirement to have a committer +1 before merging. Or is this [list of committers](https://spark.apache.org/committers.html) out of date? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/21305 @cloud-fan, I'll look into the test failures tomorrow, but this has been passing tests for weeks so I think it is still safe to review when you have time. We can fix both in parallel so that we can get validated writes in 2.4. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21911: [SPARK-24940][SQL] Coalesce Hint for SQL Queries
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21911#discussion_r206741523 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala --- @@ -660,6 +660,62 @@ class PlanParserSuite extends AnalysisTest { ) } + test ("insert hint syntax") { +assertEqual( + "INSERT INTO s /*+ COALESCE(10) */ SELECT * FROM t", + InsertIntoTable(table("s"), Map.empty, +UnresolvedHint("COALESCE", Seq(Literal(10)), + table("t").select(star())), false, false)) +assertEqual( + "INSERT INTO TABLE s /*+ COALESCE(50, true) */ SELECT * FROM t", + InsertIntoTable(table("s"), Map.empty, +UnresolvedHint("COALESCE", Seq(Literal(50), Literal(true)), + table("t").select(star())), false, false)) +assertEqual( + "INSERT INTO s /*+ REPARTITION(100) */ SELECT * FROM t", + InsertIntoTable(table("s"), Map.empty, +UnresolvedHint("REPARTITION", Seq(Literal(100)), + table("t").select(star())), false, false)) +assertEqual( + "INSERT INTO TABLE s /*+ REPARTITION(20, false) */ SELECT * FROM t", + InsertIntoTable(table("s"), Map.empty, +UnresolvedHint("REPARTITION", Seq(Literal(20), Literal(false)), + table("t").select(star())), false, false)) +assertEqual( + "INSERT OVERWRITE TABLE s /*+ COALESCE(10) */ SELECT * FROM t", + InsertIntoTable(table("s"), Map.empty, +UnresolvedHint("COALESCE", Seq(Literal(10)), + table("t").select(star())), true, false)) +assertEqual( + "INSERT OVERWRITE TABLE s /*+ COALESCE(50, true) */ SELECT * FROM t", + InsertIntoTable(table("s"), Map.empty, +UnresolvedHint("COALESCE", Seq(Literal(50), Literal(true)), + table("t").select(star())), true, false)) +assertEqual( + "INSERT OVERWRITE TABLE s /*+ REPARTITION(100) */ SELECT * FROM t", + InsertIntoTable(table("s"), Map.empty, +UnresolvedHint("REPARTITION", Seq(Literal(100)), + table("t").select(star())), true, false)) +assertEqual( + "INSERT OVERWRITE TABLE s /*+ REPARTITION(20, false) */ SELECT * FROM t", + InsertIntoTable(table("s"), Map.empty, +UnresolvedHint("REPARTITION", Seq(Literal(20), Literal(false)), + table("t").select(star())), true, false)) + +// Multiple hints +assertEqual( + "INSERT INTO s /*+ REPARTITION(100), COALESCE(50, true), COALESCE(10) */ SELECT * FROM t", + InsertIntoTable(table("s"), Map.empty, +UnresolvedHint("REPARTITION", Seq(Literal(100)), + UnresolvedHint("COALESCE", Seq(Literal(50), Literal(true)), +UnresolvedHint("COALESCE", Seq(Literal(10)), + table("t").select(star(), false, false)) + +// Wrong hint location +intercept("INSERT INTO /*+ COALESCE(10) */ s SELECT * FROM t", + "extraneous input '/*+' expecting") --- End diff -- I don't think it is necessary to test the wrong hint location because there are so many wrong hint locations and the error is just a generic parse error. If there were a friendly error message, that would be worth a test case. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21911: [SPARK-24940][SQL] Coalesce Hint for SQL Queries
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21911#discussion_r206741178 --- Diff: sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 --- @@ -332,7 +332,7 @@ resource ; queryNoWith -: insertInto? queryTerm queryOrganization #singleInsertQuery +: (insertInto (hints+=hint)*)? queryTerm queryOrganization #singleInsertQuery --- End diff -- Why allow more than one coalesce hint? Is this useful in some way? I think it is confusing to users to allow multiple hints because the order in which they are applied to the plan isn't clear. Is it left-to-right order? Would `REPARTITION(100), COALESCE(10)` repartition and then coalesce? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21911: [SPARK-24940][SQL] Coalesce Hint for SQL Queries
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21911#discussion_r206740826 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala --- @@ -102,6 +104,39 @@ object ResolveHints { } } + /** + * For coalesce hint, we accept "COALESCE" and "REPARTITION". + * Its parameters include a partition number and an optional boolean to indicate + * whether shuffle is allowed. + */ + class ResolveCoalesceHints(conf: SQLConf) extends Rule[LogicalPlan] { +private val COALESCE_HINT_NAMES = Set("COALESCE", "REPARTITION") + +private def applyCoalesceHint( + plan: LogicalPlan, + numPartitions: Int, + shuffle: Boolean): LogicalPlan = { + Repartition(numPartitions, shuffle, plan) +} + +def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators { + case h: UnresolvedHint if COALESCE_HINT_NAMES.contains(h.name.toUpperCase(Locale.ROOT)) => +h.parameters match { + case Seq(Literal(i, IntegerType)) => +val defaultShuffle = h.name.toUpperCase(Locale.ROOT) match { + case "REPARTITION" => true + case _ => false +} +applyCoalesceHint(h.child, i.asInstanceOf[Int], defaultShuffle) --- End diff -- Instead of casting, this should add the expected type to the pattern match. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21911: [SPARK-24940][SQL] Coalesce Hint for SQL Queries
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21911#discussion_r206689783 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala --- @@ -102,6 +104,39 @@ object ResolveHints { } } + /** + * For coalesce hint, we accept "COALESCE" and "REPARTITION". + * Its parameters include a partition number and an optional boolean to indicate + * whether shuffle is allowed. + */ + class ResolveCoalesceHints(conf: SQLConf) extends Rule[LogicalPlan] { +private val COALESCE_HINT_NAMES = Set("COALESCE", "REPARTITION") + +private def applyCoalesceHint( + plan: LogicalPlan, + numPartitions: Int, + shuffle: Boolean): LogicalPlan = { + Repartition(numPartitions, shuffle, plan) +} + +def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators { + case h: UnresolvedHint if COALESCE_HINT_NAMES.contains(h.name.toUpperCase(Locale.ROOT)) => --- End diff -- I think this should match `COALESCE(num)` or `REPARTITION(num)` and drop the boolean. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21911: [SPARK-24940][SQL] Coalesce Hint for SQL Queries
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/21911 @jzhuge, I think it is confusing that this hint exposes the `shuffle` boolean flag. The Spark API makes a clear distinction between `repartition` and `coalesce` where `coalesce` means that Spark won't add a shuffle. Adding the boolean here would allow users to coalesce with a shuffle (repartition) or repartition without a shuffle (coalesce). I can't think of a good reason to do that. The reason why there is a boolean `shuffle` param is to use the same plan node internally, but I don't think that detail should be leaked into SQL hints. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/21305 @cloud-fan, @gatorsmile, this has been ready for final review for a while. Do you think you'll have some time to look at it? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21308: SPARK-24253: Add DeleteSupport mix-in for DataSourceV2.
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/21308 #21888 shows how this is used to implement DELETE FROM. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21306: [SPARK-24252][SQL] Add catalog registration and table ca...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/21306 #21888 demonstrates how to add a `TableIdentifier` with a catalog element, `CatalogTableIdentifier` and how to safely migrate from the old identifier to the new one with catalog. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21888: [SPARK-24253][SQL][WIP] Implement DeleteFrom for ...
GitHub user rdblue opened a pull request: https://github.com/apache/spark/pull/21888 [SPARK-24253][SQL][WIP] Implement DeleteFrom for v2 tables ## What changes were proposed in this pull request? This adds support for DELETE FROM in SQL using the new DeleteFrom logical plan and v2 DeleteSupport mix-in. To identify the v2 table to delete data from, this uses the TableCatalog API introduced in #21306. TableIdentifier has been updated with a new superclass, CatalogTableIdentifier. Code paths that don't support identifiers with a catalog continue to use TableIdentifier, allowing a smooth transition even though some code assumes there is no support for multiple catalogs. UnresovledRelation now supports CatalogTableIdentifier, but resolution will only happen when the catalog is not defined or through a new rule with support for v2 TableCatalog. Existing uses of UnresolvedRelation access the catalog using tableIdentifier, which asserts that the catalog is not defined before returning to ensure catalog identifiers don't leak to code without catalog support. WIP: This is based on #21306, #21305, and #21877 and includes the changes from those PRs. ## How was this patch tested? WIP, will add tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/rdblue/spark SPARK-24253-add-delete-from Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21888.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21888 commit dc977d26acb29731f62addaec67ea6dbe077e670 Author: Ryan Blue Date: 2018-05-05T01:13:01Z SPARK-24252: Add v2 data source mix-in for catalog support. commit 7d75da5e6abc2bf90b8cf7f530412d6b01ee6938 Author: Ryan Blue Date: 2018-05-11T21:27:47Z SPARK-24252: Add copyright headers. commit 3be3b7c320c7f9b68270112c9d3f7fa0a1ff377c Author: Ryan Blue Date: 2018-07-04T17:02:52Z SPARK-24252: Update for review comments. * Rename CatalogSupport to TableSupport * Rename DataSourceCatalog to TableCatalog * Remove name and database from Table commit 7ad0c8e8fe45f1b85011fc91dd162dba3951593e Author: Ryan Blue Date: 2018-07-04T17:19:45Z SPARK-24252: Add TableChange example to Javadocs. commit 3de2cfa3d069aa905c2c5076acc8d59b5ee0c63e Author: Ryan Blue Date: 2018-07-25T18:11:22Z SPARK-24252: Update catalog API in org.apache.spark.sql.catalog.v2. commit ae61a797f584fba8ca2601b76f80774d22c3845c Author: Ryan Blue Date: 2018-07-25T19:25:32Z SPARK-24252: Add tests and clarify javadoc. commit 8b135607512fda53ff5adc34ac8b965b04f3a3d9 Author: Ryan Blue Date: 2018-07-25T20:00:23Z SPARK-24252: Fix Map.Entry reference causing doc failures. commit b4c505758a5137a311ac098965ace19af33bd198 Author: Ryan Blue Date: 2018-07-26T05:07:52Z SPARK-24252: Move catalog.v2 classes into catalyst. This is needed for catalyst plans to use catalogs. commit 85a49d729e0ae75cf6e1725216829c7d5e79a8a1 Author: Ryan Blue Date: 2018-05-07T15:54:37Z SPARK-24251: Add AppendData logical plan. This adds a new logical plan, AppendData, that was proposed in SPARK-23521. This also adds an analyzer rule to validate data written with AppendData against the target table. DataFrameWriter is also updated so that v2 writes use the new AppendData logical plan. commit 770a31cf43903736b10455a82b4743987dd27db6 Author: Ryan Blue Date: 2018-05-11T22:04:15Z SPARK-24253: Add DeleteSupport mix-in for DataSourceV2. commit f769341547cdc63f77456c1c2677070a4d3d363c Author: Ryan Blue Date: 2018-05-11T22:09:22Z SPARK-24253: Add interface description to DeleteSupport. commit af16c4238fd486a82e86dda02ad21279104afe21 Author: Ryan Blue Date: 2018-06-13T23:08:21Z Add DeleteFrom logical plan, add to parser. commit 88c12a47cbc3eea04ed9d90e91c62285de594b7e Author: Ryan Blue Date: 2018-07-25T18:11:45Z Add CTAS and RTAS support. This uses the catalog API introduced in SPARK-24252 to implement CTAS and RTAS plans. commit 19b83bd796406ccc69ed8a138ff7652bcb6049d1 Author: Ryan Blue Date: 2018-07-26T22:02:25Z SPARK-24253: Implement DELETE FROM for v2 tables. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21306: [SPARK-24252][SQL] Add catalog registration and t...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21306#discussion_r205571827 --- Diff: sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/CatalogProvider.java --- @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalog.v2; + +import org.apache.spark.sql.internal.SQLConf; + +/** + * A marker interface to provide a catalog implementation for Spark. + * + * Implementations can provide catalog functions by implementing additional interfaces, like + * {@link TableCatalog} to expose table operations. + * + * Catalog implementations must implement this marker interface to be loaded by + * {@link Catalogs#load(String, SQLConf)}. The loader will instantiate catalog classes using the + * required public no-arg constructor. After creating an instance, it will be configured by calling + * {@link #initialize(CaseInsensitiveStringMap)}. + * + * Catalog implementations are registered to a name by adding a configuration option to Spark: + * {@code spark.sql.catalog.catalog-name=com.example.YourCatalogClass}. All configuration properties + * in the Spark configuration that share the catalog name prefix, + * {@code spark.sql.catalog.catalog-name.(key)=(value)} will be passed in the case insensitive + * string map of options in initialization with the prefix removed. An additional property, + * {@code name}, is also added to the options and will contain the catalog's name; in this case, + * "catalog-name". + */ +public interface CatalogProvider { + /** + * Called to initialize configuration. + * + * This method is called once, just after the provider is instantiated. + * + * @param options a case-insensitive string map of configuration + */ + void initialize(CaseInsensitiveStringMap options); --- End diff -- That's a Scala map and the v2 APIs are intended to be used with both Java and Scala. My intent is to reuse this map in place of DataSourceOptions, so at least we will reduce some duplication. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #12313: [SPARK-14543] [SQL] Improve InsertIntoTable column resol...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/12313 This is addressed by #21305. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12313: [SPARK-14543] [SQL] Improve InsertIntoTable colum...
Github user rdblue closed the pull request at: https://github.com/apache/spark/pull/12313 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21237: [SPARK-23325][WIP] Test parquet returning internal row
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/21237 This is no longer needed. #21118 fixes the copy problem by always inserting a projection that copies, but delaying until after filters are run. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21237: [SPARK-23325][WIP] Test parquet returning interna...
Github user rdblue closed the pull request at: https://github.com/apache/spark/pull/21237 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21877: [SPARK-24923][SQL][WIP] Add unpartitioned CTAS and RTAS ...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/21877 @cloud-fan, @gatorsmile, @marmbrus, this PR demonstrates how plans would use the catalog changes introduced in #21306. To see the changes, you may want to look at just the last commit because this includes changes from other PRs. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21877: [SPARK-24923][SQL][WIP] Add unpartitioned CTAS an...
GitHub user rdblue opened a pull request: https://github.com/apache/spark/pull/21877 [SPARK-24923][SQL][WIP] Add unpartitioned CTAS and RTAS support for DataSourceV2 ## What changes were proposed in this pull request? * Remove extends from `ReadSupport` and `WriteSupport` classes for use with `Table` * Add CTAS and RTAS logical plans * Refactor physical write plans so AppendData, CTAS, and RTAS use the same base class * Add support for `TableCatalog` to `DataFrameReader` and `DataFrameWriter` * Add `TableV2Relation` for tables that are loaded by `TableCatalog` and have no `DataSource` instance * Move implicit helpers into `DataSourceV2Implicits` to avoid future churn Note that this doesn't handle `partitionBy` in `DataFrameWriter`. Adding support for partitioned tables will require validation rules. This is based on unmerged work and includes the commits from #21306 and #21305. ## How was this patch tested? Adding unit tests for CTAS and RTAS. You can merge this pull request into a Git repository by running: $ git pull https://github.com/rdblue/spark add-ctas-rtas-v2-plans Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21877.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21877 commit 8372f5bb47a0d6269bb16b3dc16f6f3278d2f5fd Author: Ryan Blue Date: 2018-05-05T01:13:01Z SPARK-24252: Add v2 data source mix-in for catalog support. commit 1238af73872b0105d0c5dfbbd8da5c8f18afe408 Author: Ryan Blue Date: 2018-05-07T15:54:37Z SPARK-24251: Add AppendData logical plan. This adds a new logical plan, AppendData, that was proposed in SPARK-23521. This also adds an analyzer rule to validate data written with AppendData against the target table. DataFrameWriter is also updated so that v2 writes use the new AppendData logical plan. commit d308d3c75f78242c822eab6d11fb651d94f10aa6 Author: Ryan Blue Date: 2018-07-25T18:11:45Z Add CTAS and RTAS support. This uses the catalog API introduced in SPARK-24252 to implement CTAS and RTAS plans. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21306: [SPARK-24252][SQL] Add catalog registration and t...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21306#discussion_r205237682 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala --- @@ -609,6 +611,12 @@ class SparkSession private( */ @transient lazy val catalog: Catalog = new CatalogImpl(self) + @transient private lazy val catalogs = new mutable.HashMap[String, CatalogProvider]() + + private[sql] def catalog(name: String): CatalogProvider = synchronized { --- End diff -- Note that this is `private[sql]`. This allows us to use the named `TableCatalog` instances without solving how multiple catalogs should be exposed to users through a public API just yet. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21306: [SPARK-24252][SQL] Add catalog registration and table ca...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/21306 @marmbrus, @cloud-fan, @gatorsmile, I've updated this PR to use reflection to instantiate catalogs. This allows implementations to provide named catalogs (and reuse implementations) and configure those catalogs with Spark configuration properties. FYI @bersprockets, @felixcheung, @jzhuge --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/21305 @cloud-fan, @gatorsmile, is it possible to get this in for 2.4? This validates writes to data source tables so I think it is a good one to have. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/21305 Retest this please. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21118: SPARK-23325: Use InternalRow when reading with DataSourc...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/21118 Thanks for reviewing and merging @cloud-fan, @gatorsmile, @felixcheung! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21306: [SPARK-24252][SQL] Add DataSourceV2 mix-in for table cat...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/21306 @cloud-fan, @gatorsmile, I don't think this should be merged yet. I've been implementing CTAS and RTAS based on this commit and I don't think it makes sense to get a `TableCatalog` instance from the data source. The data source should be determined by the catalog, not the other way around. Otherwise, we could have a case where a `test` catalog uses the `parquet` source, but that `parquet` source would return a `prod` catalog for its `TableCatalog` because `prod` is the default. If catalogs can reuse data sources, then the catalog should be determined first. FYI @bersprockets, @felixcheung, @henryr --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21118: SPARK-23325: Use InternalRow when reading with DataSourc...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/21118 @cloud-fan, any update on merging this? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21118: SPARK-23325: Use InternalRow when reading with DataSourc...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/21118 Rebased on master to fix conflicts. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21118: SPARK-23325: Use InternalRow when reading with Da...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21118#discussion_r204150008 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala --- @@ -53,7 +54,7 @@ class KafkaContinuousReader( metadataPath: String, initialOffsets: KafkaOffsetRangeLimit, failOnDataLoss: Boolean) - extends ContinuousReader with SupportsScanUnsafeRow with Logging { --- End diff -- We can, but this is intended to make minimal changes. We can add optimizations like this in a follow-up. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21118: SPARK-23325: Use InternalRow when reading with Da...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21118#discussion_r204149889 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala --- @@ -125,16 +125,13 @@ object DataSourceV2Strategy extends Strategy { val filterCondition = postScanFilters.reduceLeftOption(And) val withFilter = filterCondition.map(FilterExec(_, scan)).getOrElse(scan) - val withProjection = if (withFilter.output != project) { -ProjectExec(project, withFilter) - } else { -withFilter - } - - withProjection :: Nil + // always add the projection, which will produce unsafe rows required by some operators + ProjectExec(project, withFilter) :: Nil case r: StreamingDataSourceV2Relation => - DataSourceV2ScanExec(r.output, r.source, r.options, r.pushedFilters, r.reader) :: Nil + // ensure there is a projection, which will produce unsafe rows required by some operators + ProjectExec(r.output, --- End diff -- It is perfectly fine for sources to produce UnsafeRow because it is an InternalRow. I think it is important for us to get to InternalRow in this release. UnsafeRow is too hard to produce and the easiest thing to do is to produce InternalRow and then call into Spark's UnsafeProjection to produce UnsafeRow. That's painful, uses internal APIs, and is slower. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/21305 @cloud-fan, I've rebased this so it is ready for final review when you get a chance. Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21556: [SPARK-24549][SQL] Support Decimal type push down to the...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/21556 I misunderstood how it was safe as well. It was Yuming's clarification that helped. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21741: [SPARK-24718][SQL] Timestamp support pushdown to parquet...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/21741 +1 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21741: [SPARK-24718][SQL] Timestamp support pushdown to ...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21741#discussion_r202451540 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -378,6 +378,15 @@ object SQLConf { .booleanConf .createWithDefault(true) + val PARQUET_FILTER_PUSHDOWN_TIMESTAMP_ENABLED = +buildConf("spark.sql.parquet.filterPushdown.timestamp") + .doc("If true, enables Parquet filter push-down optimization for Timestamp. " + +"This configuration only has an effect when 'spark.sql.parquet.filterPushdown' is " + +"enabled and Timestamp stored as TIMESTAMP_MICROS or TIMESTAMP_MILLIS type.") --- End diff -- I would just note that push-down doesn't work for INT96 timestamps in the file. It should work for the others. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21741: [SPARK-24718][SQL] Timestamp support pushdown to ...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21741#discussion_r202451374 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -378,6 +378,15 @@ object SQLConf { .booleanConf .createWithDefault(true) + val PARQUET_FILTER_PUSHDOWN_TIMESTAMP_ENABLED = +buildConf("spark.sql.parquet.filterPushdown.timestamp") + .doc("If true, enables Parquet filter push-down optimization for Timestamp. " + +"This configuration only has an effect when 'spark.sql.parquet.filterPushdown' is " + +"enabled and Timestamp stored as TIMESTAMP_MICROS or TIMESTAMP_MILLIS type.") +.internal() +.booleanConf +.createWithDefault(true) --- End diff -- Because we're using the file schema, it doesn't mater what the write configuration is. It only matters what it was when the file was written. If the file has an INT96 timestamp, this should just not push anything down. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21556: [SPARK-24549][SQL] Support Decimal type push down to the...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/21556 +1, I think this looks ready to go. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21556: [SPARK-24549][SQL] Support Decimal type push down to the...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/21556 @HyukjinKwon, even if the values are null, the makeEq function only casts null to Java Integer so the handling is still safe. It just looks odd that `null.asInstanceOf[JInt]` is safe. Thanks to @wangyum for explaining it. Even if the null-safe equality predicate contains a null value, this should be safe. And, passing null in an equals predicate is supported by Parquet. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21556: [SPARK-24549][SQL] Support Decimal type push down...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21556#discussion_r202448032 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala --- @@ -37,41 +39,64 @@ import org.apache.spark.unsafe.types.UTF8String /** * Some utility function to convert Spark data source filters to Parquet filters. */ -private[parquet] class ParquetFilters(pushDownDate: Boolean, pushDownStartWith: Boolean) { +private[parquet] class ParquetFilters( +pushDownDate: Boolean, +pushDownDecimal: Boolean, +pushDownStartWith: Boolean) { private case class ParquetSchemaType( originalType: OriginalType, primitiveTypeName: PrimitiveTypeName, - decimalMetadata: DecimalMetadata) - - private val ParquetBooleanType = ParquetSchemaType(null, BOOLEAN, null) - private val ParquetByteType = ParquetSchemaType(INT_8, INT32, null) - private val ParquetShortType = ParquetSchemaType(INT_16, INT32, null) - private val ParquetIntegerType = ParquetSchemaType(null, INT32, null) - private val ParquetLongType = ParquetSchemaType(null, INT64, null) - private val ParquetFloatType = ParquetSchemaType(null, FLOAT, null) - private val ParquetDoubleType = ParquetSchemaType(null, DOUBLE, null) - private val ParquetStringType = ParquetSchemaType(UTF8, BINARY, null) - private val ParquetBinaryType = ParquetSchemaType(null, BINARY, null) - private val ParquetDateType = ParquetSchemaType(DATE, INT32, null) + length: Int, + decimalMeta: DecimalMetadata) + + private val ParquetBooleanType = ParquetSchemaType(null, BOOLEAN, 0, null) + private val ParquetByteType = ParquetSchemaType(INT_8, INT32, 0, null) + private val ParquetShortType = ParquetSchemaType(INT_16, INT32, 0, null) + private val ParquetIntegerType = ParquetSchemaType(null, INT32, 0, null) + private val ParquetLongType = ParquetSchemaType(null, INT64, 0, null) + private val ParquetFloatType = ParquetSchemaType(null, FLOAT, 0, null) + private val ParquetDoubleType = ParquetSchemaType(null, DOUBLE, 0, null) + private val ParquetStringType = ParquetSchemaType(UTF8, BINARY, 0, null) + private val ParquetBinaryType = ParquetSchemaType(null, BINARY, 0, null) + private val ParquetDateType = ParquetSchemaType(DATE, INT32, 0, null) private def dateToDays(date: Date): SQLDate = { DateTimeUtils.fromJavaDate(date) } + private def decimalToInt32(decimal: JBigDecimal): Integer = decimal.unscaledValue().intValue() + + private def decimalToInt64(decimal: JBigDecimal): JLong = decimal.unscaledValue().longValue() + + private def decimalToByteArray(decimal: JBigDecimal, numBytes: Int): Binary = { +val decimalBuffer = new Array[Byte](numBytes) +val bytes = decimal.unscaledValue().toByteArray + +val fixedLengthBytes = if (bytes.length == numBytes) { + bytes +} else { + val signByte = if (bytes.head < 0) -1: Byte else 0: Byte + java.util.Arrays.fill(decimalBuffer, 0, numBytes - bytes.length, signByte) + System.arraycopy(bytes, 0, decimalBuffer, numBytes - bytes.length, bytes.length) + decimalBuffer +} +Binary.fromReusedByteArray(fixedLengthBytes, 0, numBytes) + } + private val makeEq: PartialFunction[ParquetSchemaType, (String, Any) => FilterPredicate] = { --- End diff -- Sounds good. Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21556: [SPARK-24549][SQL] Support Decimal type push down...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21556#discussion_r202447544 --- Diff: sql/core/benchmarks/FilterPushdownBenchmark-results.txt --- @@ -292,120 +292,120 @@ Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz Select 1 decimal(9, 2) row (value = 7864320): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative -Parquet Vectorized3785 / 3867 4.2 240.6 1.0X -Parquet Vectorized (Pushdown) 3820 / 3928 4.1 242.9 1.0X -Native ORC Vectorized 3981 / 4049 4.0 253.1 1.0X -Native ORC Vectorized (Pushdown) 702 / 735 22.4 44.6 5.4X +Parquet Vectorized4407 / 4852 3.6 280.2 1.0X +Parquet Vectorized (Pushdown) 1602 / 1634 9.8 101.8 2.8X --- End diff -- Okay, I see. The tenths and hundredths are always 0, which makes the precision-8 numbers actually precision-10. It is still odd that this is causing Parquet to have no stats, but I'm happy with the fix. Thanks for explaining. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21556: [SPARK-24549][SQL] Support Decimal type push down to the...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/21556 @wangyum, can you explain what was happening with the `decimal(9,2)` benchmark more clearly? I asked additional questions, but the thread is on a line that changed so it's collapsed by default. Also, `valueCanMakeFilterOn` returns true for all null values, so I think we still have a problem there. Conversion from EqualNullSafe needs to support null filter values. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21556: [SPARK-24549][SQL] Support Decimal type push down...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21556#discussion_r202093955 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala --- @@ -225,12 +316,44 @@ private[parquet] class ParquetFilters(pushDownDate: Boolean, pushDownStartWith: def createFilter(schema: MessageType, predicate: sources.Filter): Option[FilterPredicate] = { val nameToType = getFieldMap(schema) +def isDecimalMatched(value: Any, decimalMeta: DecimalMetadata): Boolean = value match { + case decimal: JBigDecimal => +decimal.scale == decimalMeta.getScale + case _ => false +} + +// Since SPARK-24716, ParquetFilter accepts parquet file schema to convert to +// data source Filter. This must make sure that filter value matched the Filter. +// If doesn't matched, then the schema used to read the file is incorrect, +// which would cause data corruption. +def valueCanMakeFilterOn(name: String, value: Any): Boolean = { + value == null || (nameToType(name) match { +case ParquetBooleanType => value.isInstanceOf[JBoolean] +case ParquetByteType | ParquetShortType | ParquetIntegerType => value.isInstanceOf[Number] +case ParquetLongType => value.isInstanceOf[JLong] +case ParquetFloatType => value.isInstanceOf[JFloat] +case ParquetDoubleType => value.isInstanceOf[JDouble] +case ParquetStringType => value.isInstanceOf[String] +case ParquetBinaryType => value.isInstanceOf[Array[Byte]] +case ParquetDateType => value.isInstanceOf[Date] +case ParquetSchemaType(DECIMAL, INT32, 0, decimalMeta) => --- End diff -- Have you tried not using `|` and ignoring the physical type with `_`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org