[GitHub] spark pull request #16898: [SPARK-19563][SQL] avoid unnecessary sort in File...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/16898#discussion_r208098556 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala --- @@ -119,23 +130,45 @@ object FileFormatWriter extends Logging { uuid = UUID.randomUUID().toString, serializableHadoopConf = new SerializableConfiguration(job.getConfiguration), outputWriterFactory = outputWriterFactory, - allColumns = queryExecution.logical.output, - partitionColumns = partitionColumns, + allColumns = allColumns, dataColumns = dataColumns, - bucketSpec = bucketSpec, + partitionColumns = partitionColumns, + bucketIdExpression = bucketIdExpression, path = outputSpec.outputPath, customPartitionLocations = outputSpec.customPartitionLocations, maxRecordsPerFile = options.get("maxRecordsPerFile").map(_.toLong) .getOrElse(sparkSession.sessionState.conf.maxRecordsPerFile) ) +// We should first sort by partition columns, then bucket id, and finally sorting columns. +val requiredOrdering = partitionColumns ++ bucketIdExpression ++ sortColumns +// the sort order doesn't matter +val actualOrdering = queryExecution.executedPlan.outputOrdering.map(_.child) --- End diff -- That would be great, but may need some refactoring. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16898: [SPARK-19563][SQL] avoid unnecessary sort in File...
Github user leachbj commented on a diff in the pull request: https://github.com/apache/spark/pull/16898#discussion_r208094538 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala --- @@ -119,23 +130,45 @@ object FileFormatWriter extends Logging { uuid = UUID.randomUUID().toString, serializableHadoopConf = new SerializableConfiguration(job.getConfiguration), outputWriterFactory = outputWriterFactory, - allColumns = queryExecution.logical.output, - partitionColumns = partitionColumns, + allColumns = allColumns, dataColumns = dataColumns, - bucketSpec = bucketSpec, + partitionColumns = partitionColumns, + bucketIdExpression = bucketIdExpression, path = outputSpec.outputPath, customPartitionLocations = outputSpec.customPartitionLocations, maxRecordsPerFile = options.get("maxRecordsPerFile").map(_.toLong) .getOrElse(sparkSession.sessionState.conf.maxRecordsPerFile) ) +// We should first sort by partition columns, then bucket id, and finally sorting columns. +val requiredOrdering = partitionColumns ++ bucketIdExpression ++ sortColumns +// the sort order doesn't matter +val actualOrdering = queryExecution.executedPlan.outputOrdering.map(_.child) --- End diff -- @cloud-fan would it be possible to use the logical plan rather than the executedPlan? If the optimizer decides the data is already sorted according according to the logical plan the executedPlan won't include the fields. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16898: [SPARK-19563][SQL] avoid unnecessary sort in File...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/16898 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16898: [SPARK-19563][SQL] avoid unnecessary sort in File...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/16898#discussion_r101938630 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala --- @@ -287,31 +320,16 @@ object FileFormatWriter extends Logging { * multiple directories (partitions) or files (bucketing). */ private class DynamicPartitionWriteTask( - description: WriteJobDescription, + desc: WriteJobDescription, --- End diff -- I'd like to change both to make it consistent. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16898: [SPARK-19563][SQL] avoid unnecessary sort in File...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/16898#discussion_r101938607 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala --- @@ -108,9 +107,21 @@ object FileFormatWriter extends Logging { job.setOutputValueClass(classOf[InternalRow]) FileOutputFormat.setOutputPath(job, new Path(outputSpec.outputPath)) +val allColumns = queryExecution.logical.output val partitionSet = AttributeSet(partitionColumns) val dataColumns = queryExecution.logical.output.filterNot(partitionSet.contains) --- End diff -- it's so minor, I'll fix it in my next PR --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16898: [SPARK-19563][SQL] avoid unnecessary sort in File...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/16898#discussion_r101938559 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala --- @@ -119,23 +130,45 @@ object FileFormatWriter extends Logging { uuid = UUID.randomUUID().toString, serializableHadoopConf = new SerializableConfiguration(job.getConfiguration), outputWriterFactory = outputWriterFactory, - allColumns = queryExecution.logical.output, - partitionColumns = partitionColumns, + allColumns = allColumns, dataColumns = dataColumns, - bucketSpec = bucketSpec, + partitionColumns = partitionColumns, + bucketIdExpression = bucketIdExpression, path = outputSpec.outputPath, customPartitionLocations = outputSpec.customPartitionLocations, maxRecordsPerFile = options.get("maxRecordsPerFile").map(_.toLong) .getOrElse(sparkSession.sessionState.conf.maxRecordsPerFile) ) +// We should first sort by partition columns, then bucket id, and finally sorting columns. +val requiredOrdering = partitionColumns ++ bucketIdExpression ++ sortColumns +// the sort order doesn't matter +val actualOrdering = queryExecution.executedPlan.outputOrdering.map(_.child) +val orderingMatched = if (requiredOrdering.length > actualOrdering.length) { + false +} else { + requiredOrdering.zip(actualOrdering).forall { +case (requiredOrder, childOutputOrder) => + requiredOrder.semanticEquals(childOutputOrder) --- End diff -- it's `HashPartitioning(...).partitionIdExpression`, which returns `Pmod(new Murmur3Hash(expressions), Literal(numPartitions))`, so it may match --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16898: [SPARK-19563][SQL] avoid unnecessary sort in File...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/16898#discussion_r101888508 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala --- @@ -119,23 +130,45 @@ object FileFormatWriter extends Logging { uuid = UUID.randomUUID().toString, serializableHadoopConf = new SerializableConfiguration(job.getConfiguration), outputWriterFactory = outputWriterFactory, - allColumns = queryExecution.logical.output, - partitionColumns = partitionColumns, + allColumns = allColumns, dataColumns = dataColumns, - bucketSpec = bucketSpec, + partitionColumns = partitionColumns, + bucketIdExpression = bucketIdExpression, path = outputSpec.outputPath, customPartitionLocations = outputSpec.customPartitionLocations, maxRecordsPerFile = options.get("maxRecordsPerFile").map(_.toLong) .getOrElse(sparkSession.sessionState.conf.maxRecordsPerFile) ) +// We should first sort by partition columns, then bucket id, and finally sorting columns. +val requiredOrdering = partitionColumns ++ bucketIdExpression ++ sortColumns +// the sort order doesn't matter +val actualOrdering = queryExecution.executedPlan.outputOrdering.map(_.child) +val orderingMatched = if (requiredOrdering.length > actualOrdering.length) { + false +} else { + requiredOrdering.zip(actualOrdering).forall { +case (requiredOrder, childOutputOrder) => + requiredOrder.semanticEquals(childOutputOrder) --- End diff -- Because `bucketIdExpression` is `HashPartitioning`, this will never match, right? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16898: [SPARK-19563][SQL] avoid unnecessary sort in File...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/16898#discussion_r101888059 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala --- @@ -287,31 +320,16 @@ object FileFormatWriter extends Logging { * multiple directories (partitions) or files (bucketing). */ private class DynamicPartitionWriteTask( - description: WriteJobDescription, + desc: WriteJobDescription, --- End diff -- `SingleDirectoryWriteTask` is still using `description`. Change both or keep it unchanged? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16898: [SPARK-19563][SQL] avoid unnecessary sort in File...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/16898#discussion_r101888000 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala --- @@ -108,9 +107,21 @@ object FileFormatWriter extends Logging { job.setOutputValueClass(classOf[InternalRow]) FileOutputFormat.setOutputPath(job, new Path(outputSpec.outputPath)) +val allColumns = queryExecution.logical.output val partitionSet = AttributeSet(partitionColumns) val dataColumns = queryExecution.logical.output.filterNot(partitionSet.contains) --- End diff -- If we rewrite it to `val dataColumns = allColumns.filterNot(partitionColumns.contains)`, we do not need `partitionSet ` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16898: [SPARK-19563][SQL] avoid unnecessary sort in File...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/16898#discussion_r101887954 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala --- @@ -119,23 +130,45 @@ object FileFormatWriter extends Logging { uuid = UUID.randomUUID().toString, serializableHadoopConf = new SerializableConfiguration(job.getConfiguration), outputWriterFactory = outputWriterFactory, - allColumns = queryExecution.logical.output, - partitionColumns = partitionColumns, + allColumns = allColumns, dataColumns = dataColumns, - bucketSpec = bucketSpec, + partitionColumns = partitionColumns, + bucketIdExpression = bucketIdExpression, path = outputSpec.outputPath, customPartitionLocations = outputSpec.customPartitionLocations, maxRecordsPerFile = options.get("maxRecordsPerFile").map(_.toLong) .getOrElse(sparkSession.sessionState.conf.maxRecordsPerFile) ) +// We should first sort by partition columns, then bucket id, and finally sorting columns. +val requiredOrdering = partitionColumns ++ bucketIdExpression ++ sortColumns --- End diff -- `bucketIdExpression` should be replaced by `bucketColumns`, right? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16898: [SPARK-19563][SQL] avoid unnecessary sort in File...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/16898#discussion_r101320333 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala --- @@ -363,80 +393,42 @@ object FileFormatWriter extends Logging { committer.newTaskTempFile(taskAttemptContext, partDir, ext) } - currentWriter = description.outputWriterFactory.newInstance( + currentWriter = desc.outputWriterFactory.newInstance( path = path, -dataSchema = description.dataColumns.toStructType, +dataSchema = desc.dataColumns.toStructType, context = taskAttemptContext) } override def execute(iter: Iterator[InternalRow]): Set[String] = { - // We should first sort by partition columns, then bucket id, and finally sorting columns. - val sortingExpressions: Seq[Expression] = -description.partitionColumns ++ bucketIdExpression ++ sortColumns - val getSortingKey = UnsafeProjection.create(sortingExpressions, description.allColumns) - - val sortingKeySchema = StructType(sortingExpressions.map { -case a: Attribute => StructField(a.name, a.dataType, a.nullable) -// The sorting expressions are all `Attribute` except bucket id. -case _ => StructField("bucketId", IntegerType, nullable = false) - }) + val getPartitionColsAndBucketId = UnsafeProjection.create( +desc.partitionColumns ++ bucketIdExpression, desc.allColumns) - // Returns the data columns to be written given an input row - val getOutputRow = UnsafeProjection.create( -description.dataColumns, description.allColumns) - - // Returns the partition path given a partition key. - val getPartitionStringFunc = UnsafeProjection.create( -Seq(Concat(partitionStringExpression)), description.partitionColumns) - - // Sorts the data before write, so that we only need one writer at the same time. - val sorter = new UnsafeKVExternalSorter( -sortingKeySchema, -StructType.fromAttributes(description.dataColumns), -SparkEnv.get.blockManager, -SparkEnv.get.serializerManager, -TaskContext.get().taskMemoryManager().pageSizeBytes, - SparkEnv.get.conf.getLong("spark.shuffle.spill.numElementsForceSpillThreshold", - UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD)) + // Generates the partition path given the row generated by `getPartitionColsAndBucketId`. + val getPartPath = UnsafeProjection.create( +Seq(Concat(partitionPathExpression)), desc.partitionColumns) - while (iter.hasNext) { -val currentRow = iter.next() -sorter.insertKV(getSortingKey(currentRow), getOutputRow(currentRow)) - } - - val getBucketingKey: InternalRow => InternalRow = if (sortColumns.isEmpty) { -identity - } else { - UnsafeProjection.create(sortingExpressions.dropRight(sortColumns.length).zipWithIndex.map { - case (expr, ordinal) => BoundReference(ordinal, expr.dataType, expr.nullable) -}) - } - - val sortedIterator = sorter.sortedIterator() + // Returns the data columns to be written given an input row + val getOutputRow = UnsafeProjection.create(desc.dataColumns, desc.allColumns) // If anything below fails, we should abort the task. var recordsInFile: Long = 0L var fileCounter = 0 - var currentKey: UnsafeRow = null + var currentPartColsAndBucketId: UnsafeRow = null val updatedPartitions = mutable.Set[String]() - while (sortedIterator.next()) { -val nextKey = getBucketingKey(sortedIterator.getKey).asInstanceOf[UnsafeRow] -if (currentKey != nextKey) { - // See a new key - write to a new partition (new file). - currentKey = nextKey.copy() - logDebug(s"Writing partition: $currentKey") + for (row <- iter) { +val nextPartColsAndBucketId = getPartitionColsAndBucketId(row) --- End diff -- if you take a look at the `GenerateUnsafeProject`, actually it will reuse the same row instance, so we need to copy. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands