This is an automated email from the ASF dual-hosted git repository. zsxwing pushed a commit to branch branch-2.3 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-2.3 by this push: new b88067b [SPARK-26665][CORE] Fix a bug that BlockTransferService.fetchBlockSync may hang forever b88067b is described below commit b88067bd0f7b9466a89ce6458cb7766a24283b13 Author: Shixiong Zhu <zsxw...@gmail.com> AuthorDate: Tue Jan 22 09:00:52 2019 -0800 [SPARK-26665][CORE] Fix a bug that BlockTransferService.fetchBlockSync may hang forever ## What changes were proposed in this pull request? `ByteBuffer.allocate` may throw `OutOfMemoryError` when the block is large but no enough memory is available. However, when this happens, right now BlockTransferService.fetchBlockSync will just hang forever as its `BlockFetchingListener. onBlockFetchSuccess` doesn't complete `Promise`. This PR catches `Throwable` and uses the error to complete `Promise`. ## How was this patch tested? Added a unit test. Since I cannot make `ByteBuffer.allocate` throw `OutOfMemoryError`, I passed a negative size to make `ByteBuffer.allocate` fail. Although the error type is different, it should trigger the same code path. Closes #23590 from zsxwing/SPARK-26665. Authored-by: Shixiong Zhu <zsxw...@gmail.com> Signed-off-by: Shixiong Zhu <zsxw...@gmail.com> --- .../spark/network/BlockTransferService.scala | 12 ++- .../spark/network/BlockTransferServiceSuite.scala | 104 +++++++++++++++++++++ 2 files changed, 112 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala index eef8c31..875e4fc 100644 --- a/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala +++ b/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala @@ -105,10 +105,14 @@ abstract class BlockTransferService extends ShuffleClient with Closeable with Lo case f: FileSegmentManagedBuffer => result.success(f) case _ => - val ret = ByteBuffer.allocate(data.size.toInt) - ret.put(data.nioByteBuffer()) - ret.flip() - result.success(new NioManagedBuffer(ret)) + try { + val ret = ByteBuffer.allocate(data.size.toInt) + ret.put(data.nioByteBuffer()) + ret.flip() + result.success(new NioManagedBuffer(ret)) + } catch { + case e: Throwable => result.failure(e) + } } } }, tempFileManager) diff --git a/core/src/test/scala/org/apache/spark/network/BlockTransferServiceSuite.scala b/core/src/test/scala/org/apache/spark/network/BlockTransferServiceSuite.scala new file mode 100644 index 0000000..d7e4b91 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/network/BlockTransferServiceSuite.scala @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network + +import java.io.InputStream +import java.nio.ByteBuffer + +import scala.concurrent.Future +import scala.concurrent.duration._ +import scala.reflect.ClassTag + +import org.scalatest.concurrent._ + +import org.apache.spark.{SparkException, SparkFunSuite} +import org.apache.spark.network.buffer.ManagedBuffer +import org.apache.spark.network.shuffle.{BlockFetchingListener, DownloadFileManager} +import org.apache.spark.storage.{BlockId, StorageLevel} + +class BlockTransferServiceSuite extends SparkFunSuite with TimeLimits { + + implicit val defaultSignaler: Signaler = ThreadSignaler + + test("fetchBlockSync should not hang when BlockFetchingListener.onBlockFetchSuccess fails") { + // Create a mocked `BlockTransferService` to call `BlockFetchingListener.onBlockFetchSuccess` + // with a bad `ManagedBuffer` which will trigger an exception in `onBlockFetchSuccess`. + val blockTransferService = new BlockTransferService { + override def init(blockDataManager: BlockDataManager): Unit = {} + + override def close(): Unit = {} + + override def port: Int = 0 + + override def hostName: String = "localhost-unused" + + override def fetchBlocks( + host: String, + port: Int, + execId: String, + blockIds: Array[String], + listener: BlockFetchingListener, + tempFileManager: DownloadFileManager): Unit = { + // Notify BlockFetchingListener with a bad ManagedBuffer asynchronously + new Thread() { + override def run(): Unit = { + // This is a bad buffer to trigger `IllegalArgumentException` in + // `BlockFetchingListener.onBlockFetchSuccess`. The real issue we hit is + // `ByteBuffer.allocate` throws `OutOfMemoryError`, but we cannot make it happen in + // a test. Instead, we use a negative size value to make `ByteBuffer.allocate` fail, + // and this should trigger the same code path as `OutOfMemoryError`. + val badBuffer = new ManagedBuffer { + override def size(): Long = -1 + + override def nioByteBuffer(): ByteBuffer = null + + override def createInputStream(): InputStream = null + + override def retain(): ManagedBuffer = this + + override def release(): ManagedBuffer = this + + override def convertToNetty(): AnyRef = null + } + listener.onBlockFetchSuccess("block-id-unused", badBuffer) + } + }.start() + } + + override def uploadBlock( + hostname: String, + port: Int, + execId: String, + blockId: BlockId, + blockData: ManagedBuffer, + level: StorageLevel, + classTag: ClassTag[_]): Future[Unit] = { + // This method is unused in this test + throw new UnsupportedOperationException("uploadBlock") + } + } + + val e = intercept[SparkException] { + failAfter(10.seconds) { + blockTransferService.fetchBlockSync( + "localhost-unused", 0, "exec-id-unused", "block-id-unused", null) + } + } + assert(e.getCause.isInstanceOf[IllegalArgumentException]) + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org