This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.3 by this push: new 9e86aed7b2a [SPARK-44588][CORE][3.3] Fix double encryption issue for migrated shuffle blocks 9e86aed7b2a is described below commit 9e86aed7b2ac3f9c18346a290b9672b0d9465805 Author: Henry Mai <henry...@users.noreply.github.com> AuthorDate: Tue Aug 1 21:07:51 2023 -0700 [SPARK-44588][CORE][3.3] Fix double encryption issue for migrated shuffle blocks ### What changes were proposed in this pull request? Fix double encryption issue for migrated shuffle blocks Shuffle blocks upon migration are sent without decryption when io.encryption is enabled. The code on the receiving side ends up using serializer.wrapStream on the OutputStream to the file which results in the already encrypted bytes being encrypted again when the bytes are written out. This patch removes the usage of serializerManager.wrapStream on the receiving side and also adds tests that validate that this works as expected. I have also validated that the added tests will fail if the fix is not in place. Jira ticket with more details: https://issues.apache.org/jira/browse/SPARK-44588 ### Why are the changes needed? Migrated shuffle blocks will be double encrypted when `spark.io.encryption = true` without this fix. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit tests were added to test shuffle block migration with spark.io.encryption enabled and also fixes a test helper method to properly construct the SerializerManager with the encryption key. Closes #42277 from henrymai/branch-3.3_backport_double_encryption. Authored-by: Henry Mai <henry...@users.noreply.github.com> Signed-off-by: Dongjoon Hyun <dongj...@apache.org> --- .../spark/shuffle/IndexShuffleBlockResolver.scala | 8 ++++--- .../apache/spark/storage/BlockManagerSuite.scala | 28 ++++++++++++++++++---- 2 files changed, 28 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala index ba54555311e..d41321b4597 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala @@ -240,9 +240,11 @@ private[spark] class IndexShuffleBlockResolver( s"${blockId.getClass().getSimpleName()}") } val fileTmp = createTempFile(file) - val channel = Channels.newChannel( - serializerManager.wrapStream(blockId, - new FileOutputStream(fileTmp))) + + // Shuffle blocks' file bytes are being sent directly over the wire, so there is no need to + // serializerManager.wrapStream() on it. Meaning if it was originally encrypted, then + // it will stay encrypted when being written out to the file here. + val channel = Channels.newChannel(new FileOutputStream(fileTmp)) new StreamCallbackWithID { diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 6bfffc8ab3d..986ac79953d 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -133,7 +133,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE val transfer = transferService .getOrElse(new NettyBlockTransferService(conf, securityMgr, "localhost", "localhost", 0, 1)) val memManager = UnifiedMemoryManager(bmConf, numCores = 1) - val serializerManager = new SerializerManager(serializer, bmConf) + val serializerManager = new SerializerManager(serializer, bmConf, encryptionKey) val externalShuffleClient = if (conf.get(config.SHUFFLE_SERVICE_ENABLED)) { val transConf = SparkTransportConf.fromSparkConf(conf, "shuffle", 0) Some(new ExternalBlockStoreClient(transConf, bmSecurityMgr, @@ -2027,10 +2027,13 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE assert(master.getLocations(blockIdLarge) === Seq(store1.blockManagerId)) } - private def testShuffleBlockDecommissioning(maxShuffleSize: Option[Int], willReject: Boolean) = { + private def testShuffleBlockDecommissioning( + maxShuffleSize: Option[Int], willReject: Boolean, enableIoEncryption: Boolean) = { maxShuffleSize.foreach{ size => conf.set(STORAGE_DECOMMISSION_SHUFFLE_MAX_DISK_SIZE.key, s"${size}b") } + conf.set(IO_ENCRYPTION_ENABLED, enableIoEncryption) + val shuffleManager1 = makeSortShuffleManager(Some(conf)) val bm1 = makeBlockManager(3500, "exec1", shuffleManager = shuffleManager1) shuffleManager1.shuffleBlockResolver._blockManager = bm1 @@ -2089,15 +2092,30 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE } test("test migration of shuffle blocks during decommissioning - no limit") { - testShuffleBlockDecommissioning(None, true) + testShuffleBlockDecommissioning(None, true, false) + } + + test("test migration of shuffle blocks during decommissioning - no limit - " + + "io.encryption enabled") { + testShuffleBlockDecommissioning(None, true, true) } test("test migration of shuffle blocks during decommissioning - larger limit") { - testShuffleBlockDecommissioning(Some(10000), true) + testShuffleBlockDecommissioning(Some(10000), true, false) + } + + test("test migration of shuffle blocks during decommissioning - larger limit - " + + "io.encryption enabled") { + testShuffleBlockDecommissioning(Some(10000), true, true) } test("[SPARK-34363]test migration of shuffle blocks during decommissioning - small limit") { - testShuffleBlockDecommissioning(Some(1), false) + testShuffleBlockDecommissioning(Some(1), false, false) + } + + test("[SPARK-34363]test migration of shuffle blocks during decommissioning - small limit -" + + " io.encryption enabled") { + testShuffleBlockDecommissioning(Some(1), false, true) } test("SPARK-32919: Shuffle push merger locations should be bounded with in" + --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org