Github user jerryshao commented on a diff in the pull request:
https://github.com/apache/spark/pull/3438#discussion_r28210315
--- Diff:
core/src/main/scala/org/apache/spark/util/collection/TieredDiskMerger.scala ---
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import java.io.File
+import java.util.Comparator
+import java.util.concurrent.{PriorityBlockingQueue, CountDownLatch}
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark._
+import org.apache.spark.executor.ShuffleWriteMetrics
+import org.apache.spark.storage.BlockId
+import org.apache.spark.serializer.Serializer
+import org.apache.spark.util.CompletionIterator
+
+/**
+ * Manages blocks of sorted data on disk that need to be merged together.
Carries out a tiered
+ * merge that will never merge more than spark.shuffle.maxMergeFactor
segments at a time.
+ * Except for the final merge, which merges disk blocks to a returned
iterator, TieredDiskMerger
+ * merges blocks from disk to disk.
+ *
+ * TieredDiskMerger carries out disk-to-disk merges in a background thread
that can run concurrently
+ * with blocks being deposited on disk.
+ *
+ * When deciding which blocks to merge, it first tries to minimize the
number of blocks, and then
+ * the size of the blocks chosen.
+ */
+private[spark] class TieredDiskMerger[K, C](
+ conf: SparkConf,
+ dep: ShuffleDependency[K, _, C],
+ keyComparator: Comparator[K],
+ context: TaskContext) extends Logging {
+
+ /** Manage the on-disk shuffle block and related file, file size */
+ case class DiskShuffleBlock(blockId: BlockId, file: File, len: Long)
+ extends Comparable[DiskShuffleBlock] {
+ def compareTo(o: DiskShuffleBlock): Int = len.compare(o.len)
+ }
+
+ private val maxMergeFactor = conf.getInt("spark.shuffle.maxMergeFactor",
100)
+ private val fileBufferSize = conf.getInt("spark.shuffle.file.buffer.kb",
32) * 1024
+
+ private val blockManager = SparkEnv.get.blockManager
+ private val ser = Serializer.getSerializer(dep.serializer)
+
+ /** PriorityQueue to store the on-disk merging blocks, blocks are merged
by size ordering */
+ private val onDiskBlocks = new PriorityBlockingQueue[DiskShuffleBlock]()
+
+ /** A merging thread to merge on-disk blocks */
+ private val diskToDiskMerger = new DiskToDiskMerger
+
+ /** Signal to block/signal the merge action */
+ private val mergeReadyMonitor = new AnyRef()
+
+ private val mergeFinished = new CountDownLatch(1)
+
+ /** Whether more on-disk blocks may come in */
+ @volatile private var doneRegistering = false
+
+ /** Number of bytes spilled on disk */
+ private var _diskBytesSpilled: Long = 0L
+
+ def diskBytesSpilled: Long = _diskBytesSpilled
+
+ def registerOnDiskBlock(blockId: BlockId, file: File): Unit = {
+ assert(!doneRegistering)
+ onDiskBlocks.put(new DiskShuffleBlock(blockId, file, file.length()))
+
+ mergeReadyMonitor.synchronized {
+ if (shouldMergeNow()) {
+ mergeReadyMonitor.notify()
+ }
+ }
+ }
+
+ /**
+ * Notify the merger that no more on disk blocks will be registered.
+ */
+ def doneRegisteringOnDiskBlocks(): Unit = {
+ doneRegistering = true
--- End diff --
Thanks a lot @lianhuiwang for your comments, we've also met this issue
through running queries. I will fix this ASAP.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]