opensky142857 commented on a change in pull request #29725: URL: https://github.com/apache/spark/pull/29725#discussion_r525993298
########## File path: sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowWindowSorter.java ########## @@ -0,0 +1,453 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution; + +import com.google.common.annotations.VisibleForTesting; + +import java.util.Comparator; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.Map.Entry; +import java.util.Queue; +import java.util.TreeMap; +import java.io.IOException; +import java.util.function.Supplier; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import scala.collection.Iterator; +import scala.math.Ordering; + +import org.apache.spark.memory.SparkOutOfMemoryError; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.expressions.UnsafeProjection; +import org.apache.spark.sql.catalyst.expressions.UnsafeRow; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.util.collection.unsafe.sort.PrefixComparator; +import org.apache.spark.util.collection.unsafe.sort.RecordComparator; + +public final class UnsafeExternalRowWindowSorter extends AbstractUnsafeExternalRowSorter { + + private static final Logger logger = LoggerFactory.getLogger(UnsafeExternalRowWindowSorter.class); + + private final StructType schema; + private final UnsafeProjection partitionSpecProjection; + private final Ordering<InternalRow> orderingOfPartitionKey; + private final Ordering<InternalRow> orderingInWindow; + private final Ordering<InternalRow> orderingAcrossWindows; + private final PrefixComparator prefixComparatorInWindow; + private final UnsafeExternalRowSorter.PrefixComputer prefixComputerInWindow; + private final boolean canUseRadixSortInWindow; + private final long pageSizeBytes; + private static final int windowSorterMapMaxSize = 1; + private static final int totalNumSorters = windowSorterMapMaxSize + 1; + private final HashMap<UnsafeRow,AbstractUnsafeExternalRowSorter> windowSorterMap; + private final UnsafeExternalRowSorter mainSorter; + private final RowComparator partitionKeyComparator; + + private long numRowsInserted = 0; + + private UnsafeExternalRowSorter createUnsafeExternalRowSorterForWindow() throws IOException { + UnsafeExternalRowSorter sorter = null; + try { + if (this.orderingInWindow == null) { + sorter = UnsafeExternalRowSorter.createWithRecordComparator( + this.schema, + (Supplier<RecordComparator>)null, + prefixComparatorInWindow, + prefixComputerInWindow, + pageSizeBytes/totalNumSorters, + false); + } else { + sorter = UnsafeExternalRowSorter.create( + this.schema, + this.orderingInWindow, + this.prefixComparatorInWindow, + this.prefixComputerInWindow, + this.pageSizeBytes/totalNumSorters, + this.canUseRadixSortInWindow); + } + } catch (SparkOutOfMemoryError e) { + logger.error("Unable to create UnsafeExternalRowSorter due to SparkOutOfMemoryError."); + return null; + } + return sorter; + } + + /** + * Returns an UnsafeExternalRowWindowSorter object. + * @param schema The schema of each input row + * @param partitionSpecProjection an UnsafeProjection object created from + * a sequence of partition expressions + * @param orderingOfPartitionKey an ordering of internal rows that compares + * internal rows based on a partition key + * @param orderingInWindow an ordering of internal rows inside a window + * @param orderingAcrossWindows an ordering of internal rows across different + * windows on a Spark physical partition. This + * ordering is obtained from a partition key plus + * an ordering inside a window + * @param prefixComparatorInWindow a prefix comparator for sorting rows in a window + * @param prefixComparatorAcrossWindows a prefix comparator for sorting rows across + * different windows on a Spark physical partition + * @param prefixComputerInWindow a prefix computer to calculate the prefix of a row + * based on the sort order inside a window + * @param prefixComputerAcrossWindows a prefix computer to calculate the prefix of a row + * based on the sort order across different windows on + * a Spark physical partition + * @param canUseRadixSortInWindow whether to use radix sort to sort the rows inside + * a window + * @param canUseRadixSortAcrossWindows whether to use radix sort to sort the rows across + * different windows on a Spark physical partition + * @param pageSizeBytes the size of a page in bytes + */ + public static UnsafeExternalRowWindowSorter create( + StructType schema, + UnsafeProjection partitionSpecProjection, + Ordering<InternalRow> orderingOfPartitionKey, + Ordering<InternalRow> orderingInWindow, + Ordering<InternalRow> orderingAcrossWindows, + PrefixComparator prefixComparatorInWindow, + PrefixComparator prefixComparatorAcrossWindows, + UnsafeExternalRowSorter.PrefixComputer prefixComputerInWindow, + UnsafeExternalRowSorter.PrefixComputer prefixComputerAcrossWindows, + boolean canUseRadixSortInWindow, + boolean canUseRadixSortAcrossWindows, + long pageSizeBytes) throws IOException { + UnsafeExternalRowSorter mainSorter = UnsafeExternalRowSorter.create( + schema, + orderingAcrossWindows, + prefixComparatorAcrossWindows, + prefixComputerAcrossWindows, + pageSizeBytes/totalNumSorters, + canUseRadixSortAcrossWindows); + + return new UnsafeExternalRowWindowSorter( + mainSorter, + schema, + partitionSpecProjection, + orderingOfPartitionKey, + orderingInWindow, + orderingAcrossWindows, + prefixComparatorInWindow, + prefixComputerInWindow, + canUseRadixSortInWindow, + pageSizeBytes); + } + + private UnsafeExternalRowWindowSorter( + UnsafeExternalRowSorter mainSorter, + StructType schema, + UnsafeProjection partitionSpecProjection, + Ordering<InternalRow> orderingOfPartitionKey, + Ordering<InternalRow> orderingInWindow, + Ordering<InternalRow> orderingAcrossWindows, + PrefixComparator prefixComparatorInWindow, + UnsafeExternalRowSorter.PrefixComputer prefixComputerInWindow, + boolean canUseRadixSortInWindow, + long pageSizeBytes) { + this.mainSorter = mainSorter; + this.schema = schema; + this.partitionSpecProjection = partitionSpecProjection; + this.orderingOfPartitionKey = orderingOfPartitionKey; + this.orderingInWindow = orderingInWindow; + this.orderingAcrossWindows = orderingAcrossWindows; + this.prefixComparatorInWindow = prefixComparatorInWindow; + this.prefixComputerInWindow = prefixComputerInWindow; + this.canUseRadixSortInWindow = canUseRadixSortInWindow; + this.pageSizeBytes = pageSizeBytes; + this.windowSorterMap = new HashMap<UnsafeRow,AbstractUnsafeExternalRowSorter>( + windowSorterMapMaxSize); + this.partitionKeyComparator = new RowComparator(orderingOfPartitionKey); + } + + /** + * If the partition key is found in the hash map, then the rows will be inserted to the + * unsafe external row sorter corresponding to the partition key. Otherwise a new unsafe + * will be created, and this row will be added to the newly created sorter, and then the + * pair of partition key and newly created sorter will be added into the hash map. If the + * size of hash map is above its maximum size, then all the rows that the hash map points + * to will be moved to the sort based merger. + */ + @Override + public void insertRow(UnsafeRow row) throws IOException { + UnsafeRow windowSorterKey = this.partitionSpecProjection.apply(row); + AbstractUnsafeExternalRowSorter windowSorter = this.windowSorterMap.get(windowSorterKey); + + if (windowSorter != null) { + windowSorter.insertRow(row); + } else if (this.windowSorterMap.size() == this.windowSorterMapMaxSize) { + this.mainSorter.insertRow(row); + } else { + AbstractUnsafeExternalRowSorter sorter = createUnsafeExternalRowSorterForWindow(); + + if (sorter == null) { + this.mainSorter.spill(); Review comment: if we fail to create the new sorter, why we need to spill the main sorter? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
