maropu commented on a change in pull request #29725:
URL: https://github.com/apache/spark/pull/29725#discussion_r525834434



##########
File path: 
sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowWindowSorter.java
##########
@@ -0,0 +1,453 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map.Entry;
+import java.util.Queue;
+import java.util.TreeMap;
+import java.io.IOException;
+import java.util.function.Supplier;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import scala.collection.Iterator;
+import scala.math.Ordering;
+
+import org.apache.spark.memory.SparkOutOfMemoryError;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.UnsafeProjection;
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.util.collection.unsafe.sort.PrefixComparator;
+import org.apache.spark.util.collection.unsafe.sort.RecordComparator;
+
+public final class UnsafeExternalRowWindowSorter extends 
AbstractUnsafeExternalRowSorter {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(UnsafeExternalRowWindowSorter.class);
+
+  private final StructType schema;
+  private final UnsafeProjection partitionSpecProjection;
+  private final Ordering<InternalRow> orderingOfPartitionKey;
+  private final Ordering<InternalRow> orderingInWindow;
+  private final Ordering<InternalRow> orderingAcrossWindows;
+  private final PrefixComparator prefixComparatorInWindow;
+  private final UnsafeExternalRowSorter.PrefixComputer prefixComputerInWindow;
+  private final boolean canUseRadixSortInWindow;
+  private final long pageSizeBytes;
+  private static final int windowSorterMapMaxSize = 1;
+  private static final int totalNumSorters = windowSorterMapMaxSize + 1;
+  private final HashMap<UnsafeRow,AbstractUnsafeExternalRowSorter> 
windowSorterMap;

Review comment:
       nit: `HashMap<UnsafeRow, AbstractUnsafeExternalRowSorter>`

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala
##########
@@ -42,6 +43,139 @@ case class SortExec(
     global: Boolean,
     child: SparkPlan,
     testSpillFrequency: Int = 0)
+  extends SortExecBase(
+    sortOrder,
+    global,
+    child,
+    testSpillFrequency) {
+
+  def createSorter(): UnsafeExternalRowSorter = {
+    rowSorter = UnsafeExternalRowSorter.create(
+      schema, ordering, prefixComparator, prefixComputer, pageSize, 
canUseRadixSort)
+
+    if (testSpillFrequency > 0) {
+      rowSorter.setTestSpillFrequency(testSpillFrequency)
+    }
+    rowSorter.asInstanceOf[UnsafeExternalRowSorter]
+  }
+
+  override protected def doProduce(ctx: CodegenContext): String = {
+    doProduce(ctx, classOf[UnsafeExternalRowSorter].getName)
+  }
+}
+
+/**
+ * Performs (external) sorting for multiple windows.

Review comment:
       Could you leave some comments about what's a difference from `SortExec`?

##########
File path: 
sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowWindowSorter.java
##########
@@ -0,0 +1,453 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map.Entry;
+import java.util.Queue;
+import java.util.TreeMap;
+import java.io.IOException;
+import java.util.function.Supplier;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import scala.collection.Iterator;
+import scala.math.Ordering;
+
+import org.apache.spark.memory.SparkOutOfMemoryError;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.UnsafeProjection;
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.util.collection.unsafe.sort.PrefixComparator;
+import org.apache.spark.util.collection.unsafe.sort.RecordComparator;
+
+public final class UnsafeExternalRowWindowSorter extends 
AbstractUnsafeExternalRowSorter {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(UnsafeExternalRowWindowSorter.class);
+
+  private final StructType schema;
+  private final UnsafeProjection partitionSpecProjection;
+  private final Ordering<InternalRow> orderingOfPartitionKey;
+  private final Ordering<InternalRow> orderingInWindow;
+  private final Ordering<InternalRow> orderingAcrossWindows;
+  private final PrefixComparator prefixComparatorInWindow;
+  private final UnsafeExternalRowSorter.PrefixComputer prefixComputerInWindow;
+  private final boolean canUseRadixSortInWindow;
+  private final long pageSizeBytes;
+  private static final int windowSorterMapMaxSize = 1;
+  private static final int totalNumSorters = windowSorterMapMaxSize + 1;
+  private final HashMap<UnsafeRow,AbstractUnsafeExternalRowSorter> 
windowSorterMap;
+  private final UnsafeExternalRowSorter mainSorter;
+  private final RowComparator partitionKeyComparator;
+
+  private long numRowsInserted = 0;
+
+  private UnsafeExternalRowSorter createUnsafeExternalRowSorterForWindow() 
throws IOException {
+    UnsafeExternalRowSorter sorter = null;
+    try {
+      if (this.orderingInWindow == null) {
+        sorter = UnsafeExternalRowSorter.createWithRecordComparator(
+          this.schema,
+          (Supplier<RecordComparator>)null,
+          prefixComparatorInWindow,
+          prefixComputerInWindow,
+          pageSizeBytes/totalNumSorters,
+          false);
+      } else {
+        sorter = UnsafeExternalRowSorter.create(
+          this.schema,
+          this.orderingInWindow,
+          this.prefixComparatorInWindow,
+          this.prefixComputerInWindow,
+          this.pageSizeBytes/totalNumSorters,
+          this.canUseRadixSortInWindow);
+      }
+    } catch (SparkOutOfMemoryError e) {
+      logger.error("Unable to create UnsafeExternalRowSorter due to 
SparkOutOfMemoryError.");
+      return null;
+    }
+    return sorter;
+  }
+
+  /**
+  * Returns an UnsafeExternalRowWindowSorter object.

Review comment:
       nit: wrong format.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala
##########
@@ -42,6 +43,139 @@ case class SortExec(
     global: Boolean,
     child: SparkPlan,
     testSpillFrequency: Int = 0)
+  extends SortExecBase(
+    sortOrder,
+    global,
+    child,
+    testSpillFrequency) {
+
+  def createSorter(): UnsafeExternalRowSorter = {
+    rowSorter = UnsafeExternalRowSorter.create(
+      schema, ordering, prefixComparator, prefixComputer, pageSize, 
canUseRadixSort)
+
+    if (testSpillFrequency > 0) {
+      rowSorter.setTestSpillFrequency(testSpillFrequency)
+    }
+    rowSorter.asInstanceOf[UnsafeExternalRowSorter]
+  }
+
+  override protected def doProduce(ctx: CodegenContext): String = {
+    doProduce(ctx, classOf[UnsafeExternalRowSorter].getName)
+  }
+}
+
+/**
+ * Performs (external) sorting for multiple windows.
+ *
+ * @param partitionSpec a sequence of expressions that defines a partition key
+ * @param sortOrderInWindow a sequence of sort orders for sorting rows inside 
a window
+ * @param sortOrderAcrossWindows a sequence of sort orders for sorting rows 
across
+ *                               different windows on a Spark physical 
partition.
+ *                               This sequence of sort orders is obtained from 
a partition
+ *                               key plus a sequence of sort orders inside a 
window
+ * @param global when true performs a global sort of all partitions by 
shuffling the data first
+ *               if necessary.
+ * @param testSpillFrequency Method for configuring periodic spilling in unit 
tests. If set, will
+ *                           spill every `frequency` records.
+ */
+case class WindowSortExec(
+    partitionSpec: Seq[Expression],
+    sortOrderInWindow: Seq[SortOrder],
+    sortOrderAcrossWindows: Seq[SortOrder],
+    global: Boolean,
+    child: SparkPlan,
+    testSpillFrequency: Int = 0)
+  extends SortExecBase(
+    sortOrderAcrossWindows,
+    global,
+    child,
+    testSpillFrequency) {
+
+  def createSorter(): UnsafeExternalRowWindowSorter = {
+    val partitionSpecGrouping = UnsafeProjection.create(partitionSpec, output)
+
+    // The schema of partition key
+    val partitionKeySchema: Seq[Attribute] = output.filter(x => {
+      x.references.subsetOf(AttributeSet(partitionSpec))
+    })
+
+    // Generate the ordering of partition key
+    val orderingOfPartitionKey = RowOrdering.create(
+      sortOrderAcrossWindows diff sortOrderInWindow,
+      partitionKeySchema)
+
+    // No prefix comparator
+    val nullPrefixComparator = new PrefixComparator {
+      override def compare(prefix1: Long, prefix2: Long): Int = 0
+    }
+
+    if (sortOrderInWindow == null || sortOrderInWindow.size == 0) {

Review comment:
       When `orderingInWindow == null` or `ortOrderInWindow.size == 0`, we need 
`WindowSortExec`?

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala
##########
@@ -28,6 +28,7 @@ import 
org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodeGe
 import org.apache.spark.sql.catalyst.plans.physical._
 import org.apache.spark.sql.catalyst.util.DateTimeConstants.NANOS_PER_MILLIS
 import org.apache.spark.sql.execution.metric.SQLMetrics

Review comment:
       nit: the file name: `SortExec.scala` -> `sorts.scala`?

##########
File path: 
sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowWindowSorter.java
##########
@@ -0,0 +1,453 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map.Entry;
+import java.util.Queue;
+import java.util.TreeMap;
+import java.io.IOException;
+import java.util.function.Supplier;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import scala.collection.Iterator;
+import scala.math.Ordering;
+
+import org.apache.spark.memory.SparkOutOfMemoryError;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.UnsafeProjection;
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.util.collection.unsafe.sort.PrefixComparator;
+import org.apache.spark.util.collection.unsafe.sort.RecordComparator;
+
+public final class UnsafeExternalRowWindowSorter extends 
AbstractUnsafeExternalRowSorter {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(UnsafeExternalRowWindowSorter.class);
+
+  private final StructType schema;
+  private final UnsafeProjection partitionSpecProjection;
+  private final Ordering<InternalRow> orderingOfPartitionKey;
+  private final Ordering<InternalRow> orderingInWindow;
+  private final Ordering<InternalRow> orderingAcrossWindows;
+  private final PrefixComparator prefixComparatorInWindow;
+  private final UnsafeExternalRowSorter.PrefixComputer prefixComputerInWindow;
+  private final boolean canUseRadixSortInWindow;
+  private final long pageSizeBytes;
+  private static final int windowSorterMapMaxSize = 1;
+  private static final int totalNumSorters = windowSorterMapMaxSize + 1;
+  private final HashMap<UnsafeRow,AbstractUnsafeExternalRowSorter> 
windowSorterMap;
+  private final UnsafeExternalRowSorter mainSorter;
+  private final RowComparator partitionKeyComparator;
+
+  private long numRowsInserted = 0;
+
+  private UnsafeExternalRowSorter createUnsafeExternalRowSorterForWindow() 
throws IOException {
+    UnsafeExternalRowSorter sorter = null;
+    try {
+      if (this.orderingInWindow == null) {
+        sorter = UnsafeExternalRowSorter.createWithRecordComparator(
+          this.schema,
+          (Supplier<RecordComparator>)null,
+          prefixComparatorInWindow,
+          prefixComputerInWindow,
+          pageSizeBytes/totalNumSorters,
+          false);

Review comment:
       nit: `pageSizeBytes/totalNumSorters,` -> `pageSizeBytes / 
totalNumSorters,`

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala
##########
@@ -42,6 +43,139 @@ case class SortExec(
     global: Boolean,
     child: SparkPlan,
     testSpillFrequency: Int = 0)
+  extends SortExecBase(
+    sortOrder,
+    global,
+    child,
+    testSpillFrequency) {
+
+  def createSorter(): UnsafeExternalRowSorter = {

Review comment:
       We need `override` here.

##########
File path: 
sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowWindowSorter.java
##########
@@ -0,0 +1,453 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map.Entry;
+import java.util.Queue;
+import java.util.TreeMap;
+import java.io.IOException;
+import java.util.function.Supplier;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import scala.collection.Iterator;
+import scala.math.Ordering;
+
+import org.apache.spark.memory.SparkOutOfMemoryError;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.UnsafeProjection;
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.util.collection.unsafe.sort.PrefixComparator;
+import org.apache.spark.util.collection.unsafe.sort.RecordComparator;
+
+public final class UnsafeExternalRowWindowSorter extends 
AbstractUnsafeExternalRowSorter {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(UnsafeExternalRowWindowSorter.class);
+
+  private final StructType schema;
+  private final UnsafeProjection partitionSpecProjection;
+  private final Ordering<InternalRow> orderingOfPartitionKey;
+  private final Ordering<InternalRow> orderingInWindow;
+  private final Ordering<InternalRow> orderingAcrossWindows;
+  private final PrefixComparator prefixComparatorInWindow;
+  private final UnsafeExternalRowSorter.PrefixComputer prefixComputerInWindow;
+  private final boolean canUseRadixSortInWindow;
+  private final long pageSizeBytes;
+  private static final int windowSorterMapMaxSize = 1;
+  private static final int totalNumSorters = windowSorterMapMaxSize + 1;
+  private final HashMap<UnsafeRow,AbstractUnsafeExternalRowSorter> 
windowSorterMap;
+  private final UnsafeExternalRowSorter mainSorter;
+  private final RowComparator partitionKeyComparator;
+
+  private long numRowsInserted = 0;
+
+  private UnsafeExternalRowSorter createUnsafeExternalRowSorterForWindow() 
throws IOException {
+    UnsafeExternalRowSorter sorter = null;
+    try {
+      if (this.orderingInWindow == null) {
+        sorter = UnsafeExternalRowSorter.createWithRecordComparator(
+          this.schema,
+          (Supplier<RecordComparator>)null,
+          prefixComparatorInWindow,
+          prefixComputerInWindow,
+          pageSizeBytes/totalNumSorters,
+          false);
+      } else {
+        sorter = UnsafeExternalRowSorter.create(
+          this.schema,
+          this.orderingInWindow,
+          this.prefixComparatorInWindow,
+          this.prefixComputerInWindow,
+          this.pageSizeBytes/totalNumSorters,
+          this.canUseRadixSortInWindow);
+      }
+    } catch (SparkOutOfMemoryError e) {
+      logger.error("Unable to create UnsafeExternalRowSorter due to 
SparkOutOfMemoryError.");
+      return null;
+    }
+    return sorter;
+  }
+
+  /**
+  * Returns an UnsafeExternalRowWindowSorter object.
+  * @param  schema  The schema of each input row
+  * @param  partitionSpecProjection an UnsafeProjection object created from
+  *                                 a sequence of partition expressions
+  * @param  orderingOfPartitionKey an ordering of internal rows that compares
+  *                                internal rows based on a partition key
+  * @param orderingInWindow an ordering of internal rows inside a window
+  * @param orderingAcrossWindows an ordering of internal rows across different
+  *                              windows on a Spark physical partition. This
+  *                              ordering is obtained from a partition key plus
+  *                              an ordering inside a window
+  * @param prefixComparatorInWindow a prefix comparator for sorting rows in a 
window
+  * @param prefixComparatorAcrossWindows a prefix comparator for sorting rows 
across
+  *                                      different windows on a Spark physical 
partition
+  * @param prefixComputerInWindow a prefix computer to calculate the prefix of 
a row
+  *                               based on the sort order inside a window
+  * @param prefixComputerAcrossWindows a prefix computer to calculate the 
prefix of a row
+  *                                    based on the sort order across 
different windows on
+  *                                    a Spark physical partition
+  * @param canUseRadixSortInWindow whether to use radix sort to sort the rows 
inside
+  *                                a window
+  * @param canUseRadixSortAcrossWindows whether to use radix sort to sort the 
rows across
+  *                                     different windows on a Spark physical 
partition
+  * @param pageSizeBytes the size of a page in bytes
+  */
+  public static UnsafeExternalRowWindowSorter create(
+      StructType schema,
+      UnsafeProjection partitionSpecProjection,
+      Ordering<InternalRow> orderingOfPartitionKey,
+      Ordering<InternalRow> orderingInWindow,
+      Ordering<InternalRow> orderingAcrossWindows,
+      PrefixComparator prefixComparatorInWindow,
+      PrefixComparator prefixComparatorAcrossWindows,
+      UnsafeExternalRowSorter.PrefixComputer prefixComputerInWindow,
+      UnsafeExternalRowSorter.PrefixComputer prefixComputerAcrossWindows,
+      boolean canUseRadixSortInWindow,
+      boolean canUseRadixSortAcrossWindows,
+      long pageSizeBytes) throws IOException {
+    UnsafeExternalRowSorter mainSorter = UnsafeExternalRowSorter.create(
+      schema,
+      orderingAcrossWindows,
+      prefixComparatorAcrossWindows,
+      prefixComputerAcrossWindows,
+      pageSizeBytes/totalNumSorters,
+      canUseRadixSortAcrossWindows);
+
+    return new UnsafeExternalRowWindowSorter(
+      mainSorter,
+      schema,
+      partitionSpecProjection,
+      orderingOfPartitionKey,
+      orderingInWindow,
+      orderingAcrossWindows,
+      prefixComparatorInWindow,
+      prefixComputerInWindow,
+      canUseRadixSortInWindow,
+      pageSizeBytes);
+  }
+
+  private UnsafeExternalRowWindowSorter(
+      UnsafeExternalRowSorter mainSorter,
+      StructType schema,
+      UnsafeProjection partitionSpecProjection,
+      Ordering<InternalRow> orderingOfPartitionKey,
+      Ordering<InternalRow> orderingInWindow,
+      Ordering<InternalRow> orderingAcrossWindows,
+      PrefixComparator prefixComparatorInWindow,
+      UnsafeExternalRowSorter.PrefixComputer prefixComputerInWindow,
+      boolean canUseRadixSortInWindow,
+      long pageSizeBytes) {
+    this.mainSorter = mainSorter;
+    this.schema = schema;
+    this.partitionSpecProjection = partitionSpecProjection;
+    this.orderingOfPartitionKey = orderingOfPartitionKey;
+    this.orderingInWindow = orderingInWindow;
+    this.orderingAcrossWindows = orderingAcrossWindows;
+    this.prefixComparatorInWindow = prefixComparatorInWindow;
+    this.prefixComputerInWindow = prefixComputerInWindow;
+    this.canUseRadixSortInWindow = canUseRadixSortInWindow;
+    this.pageSizeBytes = pageSizeBytes;
+    this.windowSorterMap = new 
HashMap<UnsafeRow,AbstractUnsafeExternalRowSorter>(
+      windowSorterMapMaxSize);
+    this.partitionKeyComparator = new RowComparator(orderingOfPartitionKey);
+  }
+
+  /**
+   * If the partition key is found in the hash map, then the rows will be 
inserted to the
+   * unsafe external row sorter corresponding to the partition key. Otherwise 
a new unsafe
+   * will be created, and this row will be added to the newly created sorter, 
and then the
+   * pair of partition key and newly created sorter will be added into the 
hash map. If the
+   * size of hash map is above its maximum size, then all the rows that the 
hash map points
+   * to will be moved to the sort based merger.
+   */
+  @Override
+  public void insertRow(UnsafeRow row) throws IOException {
+    UnsafeRow windowSorterKey = this.partitionSpecProjection.apply(row);
+    AbstractUnsafeExternalRowSorter windowSorter = 
this.windowSorterMap.get(windowSorterKey);
+
+    if (windowSorter != null) {
+      windowSorter.insertRow(row);
+    } else if (this.windowSorterMap.size() == this.windowSorterMapMaxSize) {

Review comment:
       `==` -> `>=`?

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala
##########
@@ -42,6 +43,139 @@ case class SortExec(
     global: Boolean,
     child: SparkPlan,
     testSpillFrequency: Int = 0)
+  extends SortExecBase(
+    sortOrder,
+    global,
+    child,
+    testSpillFrequency) {
+
+  def createSorter(): UnsafeExternalRowSorter = {
+    rowSorter = UnsafeExternalRowSorter.create(
+      schema, ordering, prefixComparator, prefixComputer, pageSize, 
canUseRadixSort)
+
+    if (testSpillFrequency > 0) {
+      rowSorter.setTestSpillFrequency(testSpillFrequency)
+    }
+    rowSorter.asInstanceOf[UnsafeExternalRowSorter]
+  }
+
+  override protected def doProduce(ctx: CodegenContext): String = {
+    doProduce(ctx, classOf[UnsafeExternalRowSorter].getName)
+  }
+}
+
+/**
+ * Performs (external) sorting for multiple windows.
+ *
+ * @param partitionSpec a sequence of expressions that defines a partition key
+ * @param sortOrderInWindow a sequence of sort orders for sorting rows inside 
a window
+ * @param sortOrderAcrossWindows a sequence of sort orders for sorting rows 
across
+ *                               different windows on a Spark physical 
partition.
+ *                               This sequence of sort orders is obtained from 
a partition
+ *                               key plus a sequence of sort orders inside a 
window
+ * @param global when true performs a global sort of all partitions by 
shuffling the data first
+ *               if necessary.
+ * @param testSpillFrequency Method for configuring periodic spilling in unit 
tests. If set, will
+ *                           spill every `frequency` records.
+ */
+case class WindowSortExec(
+    partitionSpec: Seq[Expression],
+    sortOrderInWindow: Seq[SortOrder],
+    sortOrderAcrossWindows: Seq[SortOrder],
+    global: Boolean,
+    child: SparkPlan,
+    testSpillFrequency: Int = 0)
+  extends SortExecBase(
+    sortOrderAcrossWindows,
+    global,
+    child,
+    testSpillFrequency) {
+
+  def createSorter(): UnsafeExternalRowWindowSorter = {

Review comment:
       ditto: we need `override` here.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala
##########
@@ -55,53 +189,64 @@ case class SortExec(
   override def requiredChildDistribution: Seq[Distribution] =
     if (global) OrderedDistribution(sortOrder) :: Nil else 
UnspecifiedDistribution :: Nil
 
-  private val enableRadixSort = sqlContext.conf.enableRadixSort
+  val enableRadixSort = sqlContext.conf.enableRadixSort
+
+  lazy val boundSortExpression = BindReferences.bindReference(sortOrder.head, 
output)
+  lazy val ordering = RowOrdering.create(sortOrder, output)
+  lazy val sortPrefixExpr = SortPrefix(boundSortExpression)
+
+  // The comparator for comparing prefix
+  lazy val prefixComparator = 
SortPrefixUtils.getPrefixComparator(boundSortExpression)
+
+  // The generator for prefix
+  lazy val prefixComputer = createPrefixComputer(sortPrefixExpr)
+
+  lazy val canUseRadixSort = enableRadixSort && sortOrder.length == 1 &&
+    SortPrefixUtils.canSortFullyWithPrefix(boundSortExpression)
+
+  lazy val pageSize = SparkEnv.get.memoryManager.pageSizeBytes

Review comment:
       Please add `protected` for the variables above.

##########
File path: 
sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowWindowSorter.java
##########
@@ -0,0 +1,453 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map.Entry;
+import java.util.Queue;
+import java.util.TreeMap;
+import java.io.IOException;
+import java.util.function.Supplier;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import scala.collection.Iterator;
+import scala.math.Ordering;
+
+import org.apache.spark.memory.SparkOutOfMemoryError;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.UnsafeProjection;
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.util.collection.unsafe.sort.PrefixComparator;
+import org.apache.spark.util.collection.unsafe.sort.RecordComparator;
+
+public final class UnsafeExternalRowWindowSorter extends 
AbstractUnsafeExternalRowSorter {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(UnsafeExternalRowWindowSorter.class);
+
+  private final StructType schema;
+  private final UnsafeProjection partitionSpecProjection;
+  private final Ordering<InternalRow> orderingOfPartitionKey;
+  private final Ordering<InternalRow> orderingInWindow;
+  private final Ordering<InternalRow> orderingAcrossWindows;
+  private final PrefixComparator prefixComparatorInWindow;
+  private final UnsafeExternalRowSorter.PrefixComputer prefixComputerInWindow;
+  private final boolean canUseRadixSortInWindow;
+  private final long pageSizeBytes;
+  private static final int windowSorterMapMaxSize = 1;
+  private static final int totalNumSorters = windowSorterMapMaxSize + 1;
+  private final HashMap<UnsafeRow,AbstractUnsafeExternalRowSorter> 
windowSorterMap;
+  private final UnsafeExternalRowSorter mainSorter;
+  private final RowComparator partitionKeyComparator;
+
+  private long numRowsInserted = 0;
+
+  private UnsafeExternalRowSorter createUnsafeExternalRowSorterForWindow() 
throws IOException {
+    UnsafeExternalRowSorter sorter = null;
+    try {
+      if (this.orderingInWindow == null) {

Review comment:
       When `orderingInWindow == null`, we need `WindowSortExec`?

##########
File path: 
sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowWindowSorter.java
##########
@@ -0,0 +1,453 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map.Entry;
+import java.util.Queue;
+import java.util.TreeMap;
+import java.io.IOException;
+import java.util.function.Supplier;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import scala.collection.Iterator;
+import scala.math.Ordering;
+
+import org.apache.spark.memory.SparkOutOfMemoryError;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.UnsafeProjection;
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.util.collection.unsafe.sort.PrefixComparator;
+import org.apache.spark.util.collection.unsafe.sort.RecordComparator;
+
+public final class UnsafeExternalRowWindowSorter extends 
AbstractUnsafeExternalRowSorter {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(UnsafeExternalRowWindowSorter.class);
+
+  private final StructType schema;
+  private final UnsafeProjection partitionSpecProjection;
+  private final Ordering<InternalRow> orderingOfPartitionKey;
+  private final Ordering<InternalRow> orderingInWindow;
+  private final Ordering<InternalRow> orderingAcrossWindows;
+  private final PrefixComparator prefixComparatorInWindow;
+  private final UnsafeExternalRowSorter.PrefixComputer prefixComputerInWindow;
+  private final boolean canUseRadixSortInWindow;
+  private final long pageSizeBytes;
+  private static final int windowSorterMapMaxSize = 1;
+  private static final int totalNumSorters = windowSorterMapMaxSize + 1;
+  private final HashMap<UnsafeRow,AbstractUnsafeExternalRowSorter> 
windowSorterMap;
+  private final UnsafeExternalRowSorter mainSorter;
+  private final RowComparator partitionKeyComparator;
+
+  private long numRowsInserted = 0;
+
+  private UnsafeExternalRowSorter createUnsafeExternalRowSorterForWindow() 
throws IOException {
+    UnsafeExternalRowSorter sorter = null;
+    try {
+      if (this.orderingInWindow == null) {
+        sorter = UnsafeExternalRowSorter.createWithRecordComparator(
+          this.schema,
+          (Supplier<RecordComparator>)null,
+          prefixComparatorInWindow,
+          prefixComputerInWindow,
+          pageSizeBytes/totalNumSorters,
+          false);
+      } else {
+        sorter = UnsafeExternalRowSorter.create(
+          this.schema,
+          this.orderingInWindow,
+          this.prefixComparatorInWindow,
+          this.prefixComputerInWindow,
+          this.pageSizeBytes/totalNumSorters,
+          this.canUseRadixSortInWindow);
+      }
+    } catch (SparkOutOfMemoryError e) {
+      logger.error("Unable to create UnsafeExternalRowSorter due to 
SparkOutOfMemoryError.");
+      return null;
+    }
+    return sorter;
+  }
+
+  /**
+  * Returns an UnsafeExternalRowWindowSorter object.
+  * @param  schema  The schema of each input row
+  * @param  partitionSpecProjection an UnsafeProjection object created from
+  *                                 a sequence of partition expressions
+  * @param  orderingOfPartitionKey an ordering of internal rows that compares
+  *                                internal rows based on a partition key
+  * @param orderingInWindow an ordering of internal rows inside a window
+  * @param orderingAcrossWindows an ordering of internal rows across different
+  *                              windows on a Spark physical partition. This
+  *                              ordering is obtained from a partition key plus
+  *                              an ordering inside a window
+  * @param prefixComparatorInWindow a prefix comparator for sorting rows in a 
window
+  * @param prefixComparatorAcrossWindows a prefix comparator for sorting rows 
across
+  *                                      different windows on a Spark physical 
partition
+  * @param prefixComputerInWindow a prefix computer to calculate the prefix of 
a row
+  *                               based on the sort order inside a window
+  * @param prefixComputerAcrossWindows a prefix computer to calculate the 
prefix of a row
+  *                                    based on the sort order across 
different windows on
+  *                                    a Spark physical partition
+  * @param canUseRadixSortInWindow whether to use radix sort to sort the rows 
inside
+  *                                a window
+  * @param canUseRadixSortAcrossWindows whether to use radix sort to sort the 
rows across
+  *                                     different windows on a Spark physical 
partition
+  * @param pageSizeBytes the size of a page in bytes
+  */
+  public static UnsafeExternalRowWindowSorter create(
+      StructType schema,
+      UnsafeProjection partitionSpecProjection,
+      Ordering<InternalRow> orderingOfPartitionKey,
+      Ordering<InternalRow> orderingInWindow,
+      Ordering<InternalRow> orderingAcrossWindows,
+      PrefixComparator prefixComparatorInWindow,
+      PrefixComparator prefixComparatorAcrossWindows,
+      UnsafeExternalRowSorter.PrefixComputer prefixComputerInWindow,
+      UnsafeExternalRowSorter.PrefixComputer prefixComputerAcrossWindows,
+      boolean canUseRadixSortInWindow,
+      boolean canUseRadixSortAcrossWindows,
+      long pageSizeBytes) throws IOException {
+    UnsafeExternalRowSorter mainSorter = UnsafeExternalRowSorter.create(
+      schema,
+      orderingAcrossWindows,
+      prefixComparatorAcrossWindows,
+      prefixComputerAcrossWindows,
+      pageSizeBytes/totalNumSorters,
+      canUseRadixSortAcrossWindows);
+
+    return new UnsafeExternalRowWindowSorter(
+      mainSorter,
+      schema,
+      partitionSpecProjection,
+      orderingOfPartitionKey,
+      orderingInWindow,
+      orderingAcrossWindows,
+      prefixComparatorInWindow,
+      prefixComputerInWindow,
+      canUseRadixSortInWindow,
+      pageSizeBytes);
+  }
+
+  private UnsafeExternalRowWindowSorter(
+      UnsafeExternalRowSorter mainSorter,
+      StructType schema,
+      UnsafeProjection partitionSpecProjection,
+      Ordering<InternalRow> orderingOfPartitionKey,
+      Ordering<InternalRow> orderingInWindow,
+      Ordering<InternalRow> orderingAcrossWindows,
+      PrefixComparator prefixComparatorInWindow,
+      UnsafeExternalRowSorter.PrefixComputer prefixComputerInWindow,
+      boolean canUseRadixSortInWindow,
+      long pageSizeBytes) {
+    this.mainSorter = mainSorter;
+    this.schema = schema;
+    this.partitionSpecProjection = partitionSpecProjection;
+    this.orderingOfPartitionKey = orderingOfPartitionKey;
+    this.orderingInWindow = orderingInWindow;
+    this.orderingAcrossWindows = orderingAcrossWindows;
+    this.prefixComparatorInWindow = prefixComparatorInWindow;
+    this.prefixComputerInWindow = prefixComputerInWindow;
+    this.canUseRadixSortInWindow = canUseRadixSortInWindow;
+    this.pageSizeBytes = pageSizeBytes;
+    this.windowSorterMap = new 
HashMap<UnsafeRow,AbstractUnsafeExternalRowSorter>(
+      windowSorterMapMaxSize);
+    this.partitionKeyComparator = new RowComparator(orderingOfPartitionKey);
+  }
+
+  /**
+   * If the partition key is found in the hash map, then the rows will be 
inserted to the
+   * unsafe external row sorter corresponding to the partition key. Otherwise 
a new unsafe
+   * will be created, and this row will be added to the newly created sorter, 
and then the
+   * pair of partition key and newly created sorter will be added into the 
hash map. If the
+   * size of hash map is above its maximum size, then all the rows that the 
hash map points
+   * to will be moved to the sort based merger.
+   */
+  @Override
+  public void insertRow(UnsafeRow row) throws IOException {
+    UnsafeRow windowSorterKey = this.partitionSpecProjection.apply(row);
+    AbstractUnsafeExternalRowSorter windowSorter = 
this.windowSorterMap.get(windowSorterKey);
+
+    if (windowSorter != null) {
+      windowSorter.insertRow(row);
+    } else if (this.windowSorterMap.size() == this.windowSorterMapMaxSize) {
+      this.mainSorter.insertRow(row);

Review comment:
       nit: `mainSorter` -> `fallbackSorter`?

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala
##########
@@ -42,6 +43,139 @@ case class SortExec(
     global: Boolean,
     child: SparkPlan,
     testSpillFrequency: Int = 0)
+  extends SortExecBase(
+    sortOrder,
+    global,
+    child,
+    testSpillFrequency) {
+
+  def createSorter(): UnsafeExternalRowSorter = {
+    rowSorter = UnsafeExternalRowSorter.create(
+      schema, ordering, prefixComparator, prefixComputer, pageSize, 
canUseRadixSort)
+
+    if (testSpillFrequency > 0) {
+      rowSorter.setTestSpillFrequency(testSpillFrequency)
+    }
+    rowSorter.asInstanceOf[UnsafeExternalRowSorter]
+  }
+
+  override protected def doProduce(ctx: CodegenContext): String = {
+    doProduce(ctx, classOf[UnsafeExternalRowSorter].getName)
+  }
+}
+
+/**
+ * Performs (external) sorting for multiple windows.
+ *
+ * @param partitionSpec a sequence of expressions that defines a partition key
+ * @param sortOrderInWindow a sequence of sort orders for sorting rows inside 
a window
+ * @param sortOrderAcrossWindows a sequence of sort orders for sorting rows 
across
+ *                               different windows on a Spark physical 
partition.
+ *                               This sequence of sort orders is obtained from 
a partition
+ *                               key plus a sequence of sort orders inside a 
window
+ * @param global when true performs a global sort of all partitions by 
shuffling the data first
+ *               if necessary.
+ * @param testSpillFrequency Method for configuring periodic spilling in unit 
tests. If set, will
+ *                           spill every `frequency` records.
+ */
+case class WindowSortExec(
+    partitionSpec: Seq[Expression],
+    sortOrderInWindow: Seq[SortOrder],
+    sortOrderAcrossWindows: Seq[SortOrder],
+    global: Boolean,

Review comment:
       It seems we don't need `global` for this node.

##########
File path: 
sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowWindowSorter.java
##########
@@ -0,0 +1,453 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map.Entry;
+import java.util.Queue;
+import java.util.TreeMap;
+import java.io.IOException;
+import java.util.function.Supplier;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import scala.collection.Iterator;
+import scala.math.Ordering;
+
+import org.apache.spark.memory.SparkOutOfMemoryError;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.UnsafeProjection;
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.util.collection.unsafe.sort.PrefixComparator;
+import org.apache.spark.util.collection.unsafe.sort.RecordComparator;
+
+public final class UnsafeExternalRowWindowSorter extends 
AbstractUnsafeExternalRowSorter {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(UnsafeExternalRowWindowSorter.class);
+
+  private final StructType schema;
+  private final UnsafeProjection partitionSpecProjection;
+  private final Ordering<InternalRow> orderingOfPartitionKey;
+  private final Ordering<InternalRow> orderingInWindow;
+  private final Ordering<InternalRow> orderingAcrossWindows;
+  private final PrefixComparator prefixComparatorInWindow;
+  private final UnsafeExternalRowSorter.PrefixComputer prefixComputerInWindow;
+  private final boolean canUseRadixSortInWindow;
+  private final long pageSizeBytes;
+  private static final int windowSorterMapMaxSize = 1;

Review comment:
       I have the same question. Could you parameterize it via `SQLConf`?

##########
File path: 
sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowWindowSorter.java
##########
@@ -0,0 +1,453 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map.Entry;
+import java.util.Queue;
+import java.util.TreeMap;
+import java.io.IOException;
+import java.util.function.Supplier;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import scala.collection.Iterator;
+import scala.math.Ordering;
+
+import org.apache.spark.memory.SparkOutOfMemoryError;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.UnsafeProjection;
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.util.collection.unsafe.sort.PrefixComparator;
+import org.apache.spark.util.collection.unsafe.sort.RecordComparator;
+
+public final class UnsafeExternalRowWindowSorter extends 
AbstractUnsafeExternalRowSorter {

Review comment:
       Could you leave some comments about what's a difference from 
`UnsafeExternalRowSorter`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to