xuzikun2003 commented on a change in pull request #29725: URL: https://github.com/apache/spark/pull/29725#discussion_r525862140
########## File path: sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowWindowSorter.java ########## @@ -0,0 +1,453 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution; + +import com.google.common.annotations.VisibleForTesting; + +import java.util.Comparator; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.Map.Entry; +import java.util.Queue; +import java.util.TreeMap; +import java.io.IOException; +import java.util.function.Supplier; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import scala.collection.Iterator; +import scala.math.Ordering; + +import org.apache.spark.memory.SparkOutOfMemoryError; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.expressions.UnsafeProjection; +import org.apache.spark.sql.catalyst.expressions.UnsafeRow; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.util.collection.unsafe.sort.PrefixComparator; +import org.apache.spark.util.collection.unsafe.sort.RecordComparator; + +public final class UnsafeExternalRowWindowSorter extends AbstractUnsafeExternalRowSorter { + + private static final Logger logger = LoggerFactory.getLogger(UnsafeExternalRowWindowSorter.class); + + private final StructType schema; + private final UnsafeProjection partitionSpecProjection; + private final Ordering<InternalRow> orderingOfPartitionKey; + private final Ordering<InternalRow> orderingInWindow; + private final Ordering<InternalRow> orderingAcrossWindows; + private final PrefixComparator prefixComparatorInWindow; + private final UnsafeExternalRowSorter.PrefixComputer prefixComputerInWindow; + private final boolean canUseRadixSortInWindow; + private final long pageSizeBytes; + private static final int windowSorterMapMaxSize = 1; Review comment: @maropu , @opensky142857, here are the reasons for why we set the windowSorterMapMaxSize to be 1 and why we reduce the page size of each sorter. Each UnsafeExternalRowSorter is using a different memory consumer. Whenever you insert the first row into an UnsafeExternalRowSorter, the memory consumer of this sorter will allocate a whole page to the sorter. In our perf run of TPCDS100TB, the default page size is 64MB. If we insert only a few rows into a sorter corresponding to a window, then a lot of memory resources are wasted and the non-necessary memory allocation also brings significant performance overhead. So that is why we do two things in this PR: 1. Keep the number of window sorters small 2. Decrease the page size of each window sorter. To address this problem, actually we have two directions to go. One direction is that we can let these window sorters share the same memory consumer. Thus we won't allocate many big pages to which very few rows are inserted. But this direction requires a lot of engineer effort to refactor the code of UnsafeExternalSorter. The second direction is that we only keep one window sorter for each physical partition. Here is why we choose the second direction. When we run TPCDS100TB, we are not seeing Spark engine is slow in sorting many windows in a physical partition. We are seeing Spark engine is slow in sorting a single window in a single physical partition (q67 is the case), and the executor is doing a lot of unnecessary comparisons on the window partition key. To address the slowness that we observe, we follow the second direction to keep only one window sorter for each physical partition. And this single window sorter in each physical partition does not need to compare the window partition key and thus it runs almost 2 times faster. Perhaps I can rename these parameters to avoid confusion. How do you guys think? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
