[GitHub] spark pull request #16898: [SPARK-19563][SQL] avoid unnecessary sort in File...

2018-08-06 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16898#discussion_r208098556
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
 ---
@@ -119,23 +130,45 @@ object FileFormatWriter extends Logging {
   uuid = UUID.randomUUID().toString,
   serializableHadoopConf = new 
SerializableConfiguration(job.getConfiguration),
   outputWriterFactory = outputWriterFactory,
-  allColumns = queryExecution.logical.output,
-  partitionColumns = partitionColumns,
+  allColumns = allColumns,
   dataColumns = dataColumns,
-  bucketSpec = bucketSpec,
+  partitionColumns = partitionColumns,
+  bucketIdExpression = bucketIdExpression,
   path = outputSpec.outputPath,
   customPartitionLocations = outputSpec.customPartitionLocations,
   maxRecordsPerFile = options.get("maxRecordsPerFile").map(_.toLong)
 .getOrElse(sparkSession.sessionState.conf.maxRecordsPerFile)
 )
 
+// We should first sort by partition columns, then bucket id, and 
finally sorting columns.
+val requiredOrdering = partitionColumns ++ bucketIdExpression ++ 
sortColumns
+// the sort order doesn't matter
+val actualOrdering = 
queryExecution.executedPlan.outputOrdering.map(_.child)
--- End diff --

That would be great, but may need some refactoring.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16898: [SPARK-19563][SQL] avoid unnecessary sort in File...

2018-08-06 Thread leachbj
Github user leachbj commented on a diff in the pull request:

https://github.com/apache/spark/pull/16898#discussion_r208094538
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
 ---
@@ -119,23 +130,45 @@ object FileFormatWriter extends Logging {
   uuid = UUID.randomUUID().toString,
   serializableHadoopConf = new 
SerializableConfiguration(job.getConfiguration),
   outputWriterFactory = outputWriterFactory,
-  allColumns = queryExecution.logical.output,
-  partitionColumns = partitionColumns,
+  allColumns = allColumns,
   dataColumns = dataColumns,
-  bucketSpec = bucketSpec,
+  partitionColumns = partitionColumns,
+  bucketIdExpression = bucketIdExpression,
   path = outputSpec.outputPath,
   customPartitionLocations = outputSpec.customPartitionLocations,
   maxRecordsPerFile = options.get("maxRecordsPerFile").map(_.toLong)
 .getOrElse(sparkSession.sessionState.conf.maxRecordsPerFile)
 )
 
+// We should first sort by partition columns, then bucket id, and 
finally sorting columns.
+val requiredOrdering = partitionColumns ++ bucketIdExpression ++ 
sortColumns
+// the sort order doesn't matter
+val actualOrdering = 
queryExecution.executedPlan.outputOrdering.map(_.child)
--- End diff --

@cloud-fan would it be possible to use the logical plan rather than the 
executedPlan?  If the optimizer decides the data is already sorted according 
according to the logical plan the executedPlan won't include the fields.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16898: [SPARK-19563][SQL] avoid unnecessary sort in File...

2017-02-19 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/16898


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16898: [SPARK-19563][SQL] avoid unnecessary sort in File...

2017-02-19 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16898#discussion_r101938630
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
 ---
@@ -287,31 +320,16 @@ object FileFormatWriter extends Logging {
* multiple directories (partitions) or files (bucketing).
*/
   private class DynamicPartitionWriteTask(
-  description: WriteJobDescription,
+  desc: WriteJobDescription,
--- End diff --

I'd like to change both to make it consistent.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16898: [SPARK-19563][SQL] avoid unnecessary sort in File...

2017-02-19 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16898#discussion_r101938607
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
 ---
@@ -108,9 +107,21 @@ object FileFormatWriter extends Logging {
 job.setOutputValueClass(classOf[InternalRow])
 FileOutputFormat.setOutputPath(job, new Path(outputSpec.outputPath))
 
+val allColumns = queryExecution.logical.output
 val partitionSet = AttributeSet(partitionColumns)
 val dataColumns = 
queryExecution.logical.output.filterNot(partitionSet.contains)
--- End diff --

it's so minor, I'll fix it in my next PR


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16898: [SPARK-19563][SQL] avoid unnecessary sort in File...

2017-02-19 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16898#discussion_r101938559
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
 ---
@@ -119,23 +130,45 @@ object FileFormatWriter extends Logging {
   uuid = UUID.randomUUID().toString,
   serializableHadoopConf = new 
SerializableConfiguration(job.getConfiguration),
   outputWriterFactory = outputWriterFactory,
-  allColumns = queryExecution.logical.output,
-  partitionColumns = partitionColumns,
+  allColumns = allColumns,
   dataColumns = dataColumns,
-  bucketSpec = bucketSpec,
+  partitionColumns = partitionColumns,
+  bucketIdExpression = bucketIdExpression,
   path = outputSpec.outputPath,
   customPartitionLocations = outputSpec.customPartitionLocations,
   maxRecordsPerFile = options.get("maxRecordsPerFile").map(_.toLong)
 .getOrElse(sparkSession.sessionState.conf.maxRecordsPerFile)
 )
 
+// We should first sort by partition columns, then bucket id, and 
finally sorting columns.
+val requiredOrdering = partitionColumns ++ bucketIdExpression ++ 
sortColumns
+// the sort order doesn't matter
+val actualOrdering = 
queryExecution.executedPlan.outputOrdering.map(_.child)
+val orderingMatched = if (requiredOrdering.length > 
actualOrdering.length) {
+  false
+} else {
+  requiredOrdering.zip(actualOrdering).forall {
+case (requiredOrder, childOutputOrder) =>
+  requiredOrder.semanticEquals(childOutputOrder)
--- End diff --

it's `HashPartitioning(...).partitionIdExpression`, which returns `Pmod(new 
Murmur3Hash(expressions), Literal(numPartitions))`, so it may match


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16898: [SPARK-19563][SQL] avoid unnecessary sort in File...

2017-02-18 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/16898#discussion_r101888508
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
 ---
@@ -119,23 +130,45 @@ object FileFormatWriter extends Logging {
   uuid = UUID.randomUUID().toString,
   serializableHadoopConf = new 
SerializableConfiguration(job.getConfiguration),
   outputWriterFactory = outputWriterFactory,
-  allColumns = queryExecution.logical.output,
-  partitionColumns = partitionColumns,
+  allColumns = allColumns,
   dataColumns = dataColumns,
-  bucketSpec = bucketSpec,
+  partitionColumns = partitionColumns,
+  bucketIdExpression = bucketIdExpression,
   path = outputSpec.outputPath,
   customPartitionLocations = outputSpec.customPartitionLocations,
   maxRecordsPerFile = options.get("maxRecordsPerFile").map(_.toLong)
 .getOrElse(sparkSession.sessionState.conf.maxRecordsPerFile)
 )
 
+// We should first sort by partition columns, then bucket id, and 
finally sorting columns.
+val requiredOrdering = partitionColumns ++ bucketIdExpression ++ 
sortColumns
+// the sort order doesn't matter
+val actualOrdering = 
queryExecution.executedPlan.outputOrdering.map(_.child)
+val orderingMatched = if (requiredOrdering.length > 
actualOrdering.length) {
+  false
+} else {
+  requiredOrdering.zip(actualOrdering).forall {
+case (requiredOrder, childOutputOrder) =>
+  requiredOrder.semanticEquals(childOutputOrder)
--- End diff --

Because `bucketIdExpression` is `HashPartitioning`, this will never match, 
right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16898: [SPARK-19563][SQL] avoid unnecessary sort in File...

2017-02-18 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/16898#discussion_r101888059
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
 ---
@@ -287,31 +320,16 @@ object FileFormatWriter extends Logging {
* multiple directories (partitions) or files (bucketing).
*/
   private class DynamicPartitionWriteTask(
-  description: WriteJobDescription,
+  desc: WriteJobDescription,
--- End diff --

`SingleDirectoryWriteTask` is still using `description`. Change both or 
keep it unchanged?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16898: [SPARK-19563][SQL] avoid unnecessary sort in File...

2017-02-18 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/16898#discussion_r101888000
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
 ---
@@ -108,9 +107,21 @@ object FileFormatWriter extends Logging {
 job.setOutputValueClass(classOf[InternalRow])
 FileOutputFormat.setOutputPath(job, new Path(outputSpec.outputPath))
 
+val allColumns = queryExecution.logical.output
 val partitionSet = AttributeSet(partitionColumns)
 val dataColumns = 
queryExecution.logical.output.filterNot(partitionSet.contains)
--- End diff --

If we rewrite it to `val dataColumns = 
allColumns.filterNot(partitionColumns.contains)`, we do not need `partitionSet `


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16898: [SPARK-19563][SQL] avoid unnecessary sort in File...

2017-02-18 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/16898#discussion_r101887954
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
 ---
@@ -119,23 +130,45 @@ object FileFormatWriter extends Logging {
   uuid = UUID.randomUUID().toString,
   serializableHadoopConf = new 
SerializableConfiguration(job.getConfiguration),
   outputWriterFactory = outputWriterFactory,
-  allColumns = queryExecution.logical.output,
-  partitionColumns = partitionColumns,
+  allColumns = allColumns,
   dataColumns = dataColumns,
-  bucketSpec = bucketSpec,
+  partitionColumns = partitionColumns,
+  bucketIdExpression = bucketIdExpression,
   path = outputSpec.outputPath,
   customPartitionLocations = outputSpec.customPartitionLocations,
   maxRecordsPerFile = options.get("maxRecordsPerFile").map(_.toLong)
 .getOrElse(sparkSession.sessionState.conf.maxRecordsPerFile)
 )
 
+// We should first sort by partition columns, then bucket id, and 
finally sorting columns.
+val requiredOrdering = partitionColumns ++ bucketIdExpression ++ 
sortColumns
--- End diff --

`bucketIdExpression` should be replaced by `bucketColumns`, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16898: [SPARK-19563][SQL] avoid unnecessary sort in File...

2017-02-15 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16898#discussion_r101320333
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
 ---
@@ -363,80 +393,42 @@ object FileFormatWriter extends Logging {
 committer.newTaskTempFile(taskAttemptContext, partDir, ext)
   }
 
-  currentWriter = description.outputWriterFactory.newInstance(
+  currentWriter = desc.outputWriterFactory.newInstance(
 path = path,
-dataSchema = description.dataColumns.toStructType,
+dataSchema = desc.dataColumns.toStructType,
 context = taskAttemptContext)
 }
 
 override def execute(iter: Iterator[InternalRow]): Set[String] = {
-  // We should first sort by partition columns, then bucket id, and 
finally sorting columns.
-  val sortingExpressions: Seq[Expression] =
-description.partitionColumns ++ bucketIdExpression ++ sortColumns
-  val getSortingKey = UnsafeProjection.create(sortingExpressions, 
description.allColumns)
-
-  val sortingKeySchema = StructType(sortingExpressions.map {
-case a: Attribute => StructField(a.name, a.dataType, a.nullable)
-// The sorting expressions are all `Attribute` except bucket id.
-case _ => StructField("bucketId", IntegerType, nullable = false)
-  })
+  val getPartitionColsAndBucketId = UnsafeProjection.create(
+desc.partitionColumns ++ bucketIdExpression, desc.allColumns)
 
-  // Returns the data columns to be written given an input row
-  val getOutputRow = UnsafeProjection.create(
-description.dataColumns, description.allColumns)
-
-  // Returns the partition path given a partition key.
-  val getPartitionStringFunc = UnsafeProjection.create(
-Seq(Concat(partitionStringExpression)), 
description.partitionColumns)
-
-  // Sorts the data before write, so that we only need one writer at 
the same time.
-  val sorter = new UnsafeKVExternalSorter(
-sortingKeySchema,
-StructType.fromAttributes(description.dataColumns),
-SparkEnv.get.blockManager,
-SparkEnv.get.serializerManager,
-TaskContext.get().taskMemoryManager().pageSizeBytes,
-
SparkEnv.get.conf.getLong("spark.shuffle.spill.numElementsForceSpillThreshold",
-  UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD))
+  // Generates the partition path given the row generated by 
`getPartitionColsAndBucketId`.
+  val getPartPath = UnsafeProjection.create(
+Seq(Concat(partitionPathExpression)), desc.partitionColumns)
 
-  while (iter.hasNext) {
-val currentRow = iter.next()
-sorter.insertKV(getSortingKey(currentRow), 
getOutputRow(currentRow))
-  }
-
-  val getBucketingKey: InternalRow => InternalRow = if 
(sortColumns.isEmpty) {
-identity
-  } else {
-
UnsafeProjection.create(sortingExpressions.dropRight(sortColumns.length).zipWithIndex.map
 {
-  case (expr, ordinal) => BoundReference(ordinal, expr.dataType, 
expr.nullable)
-})
-  }
-
-  val sortedIterator = sorter.sortedIterator()
+  // Returns the data columns to be written given an input row
+  val getOutputRow = UnsafeProjection.create(desc.dataColumns, 
desc.allColumns)
 
   // If anything below fails, we should abort the task.
   var recordsInFile: Long = 0L
   var fileCounter = 0
-  var currentKey: UnsafeRow = null
+  var currentPartColsAndBucketId: UnsafeRow = null
   val updatedPartitions = mutable.Set[String]()
-  while (sortedIterator.next()) {
-val nextKey = 
getBucketingKey(sortedIterator.getKey).asInstanceOf[UnsafeRow]
-if (currentKey != nextKey) {
-  // See a new key - write to a new partition (new file).
-  currentKey = nextKey.copy()
-  logDebug(s"Writing partition: $currentKey")
+  for (row <- iter) {
+val nextPartColsAndBucketId = getPartitionColsAndBucketId(row)
--- End diff --

if you take a look at the `GenerateUnsafeProject`, actually it will reuse 
the same row instance, so we need to copy.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands