maropu commented on a change in pull request #29725:
URL: https://github.com/apache/spark/pull/29725#discussion_r526641582
##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala
##########
@@ -42,6 +43,139 @@ case class SortExec(
global: Boolean,
child: SparkPlan,
testSpillFrequency: Int = 0)
+ extends SortExecBase(
+ sortOrder,
+ global,
+ child,
+ testSpillFrequency) {
+
+ def createSorter(): UnsafeExternalRowSorter = {
+ rowSorter = UnsafeExternalRowSorter.create(
+ schema, ordering, prefixComparator, prefixComputer, pageSize,
canUseRadixSort)
+
+ if (testSpillFrequency > 0) {
+ rowSorter.setTestSpillFrequency(testSpillFrequency)
+ }
+ rowSorter.asInstanceOf[UnsafeExternalRowSorter]
+ }
+
+ override protected def doProduce(ctx: CodegenContext): String = {
+ doProduce(ctx, classOf[UnsafeExternalRowSorter].getName)
+ }
+}
+
+/**
+ * Performs (external) sorting for multiple windows.
+ *
+ * @param partitionSpec a sequence of expressions that defines a partition key
+ * @param sortOrderInWindow a sequence of sort orders for sorting rows inside
a window
+ * @param sortOrderAcrossWindows a sequence of sort orders for sorting rows
across
+ * different windows on a Spark physical
partition.
+ * This sequence of sort orders is obtained from
a partition
+ * key plus a sequence of sort orders inside a
window
+ * @param global when true performs a global sort of all partitions by
shuffling the data first
+ * if necessary.
+ * @param testSpillFrequency Method for configuring periodic spilling in unit
tests. If set, will
+ * spill every `frequency` records.
+ */
+case class WindowSortExec(
+ partitionSpec: Seq[Expression],
+ sortOrderInWindow: Seq[SortOrder],
+ sortOrderAcrossWindows: Seq[SortOrder],
+ global: Boolean,
+ child: SparkPlan,
+ testSpillFrequency: Int = 0)
+ extends SortExecBase(
+ sortOrderAcrossWindows,
+ global,
+ child,
+ testSpillFrequency) {
+
+ def createSorter(): UnsafeExternalRowWindowSorter = {
+ val partitionSpecGrouping = UnsafeProjection.create(partitionSpec, output)
+
+ // The schema of partition key
+ val partitionKeySchema: Seq[Attribute] = output.filter(x => {
+ x.references.subsetOf(AttributeSet(partitionSpec))
+ })
+
+ // Generate the ordering of partition key
+ val orderingOfPartitionKey = RowOrdering.create(
+ sortOrderAcrossWindows diff sortOrderInWindow,
+ partitionKeySchema)
+
+ // No prefix comparator
+ val nullPrefixComparator = new PrefixComparator {
+ override def compare(prefix1: Long, prefix2: Long): Int = 0
+ }
+
+ if (sortOrderInWindow == null || sortOrderInWindow.size == 0) {
Review comment:
If so, It seems we don't this `if` section?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]