MasseGuillaume commented on code in PR #50474: URL: https://github.com/apache/spark/pull/50474#discussion_r2109688257
########## core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala: ########## @@ -1014,39 +1014,44 @@ final class ShuffleBlockFetcherIterator( // a SuccessFetchResult or a FailureFetchResult. result = null - case PushMergedLocalMetaFetchResult( - shuffleId, shuffleMergeId, reduceId, bitmaps, localDirs) => - // Fetch push-merged-local shuffle block data as multiple shuffle chunks - val shuffleBlockId = ShuffleMergedBlockId(shuffleId, shuffleMergeId, reduceId) - try { - val bufs: Seq[ManagedBuffer] = blockManager.getLocalMergedBlockData(shuffleBlockId, - localDirs) - // Since the request for local block meta completed successfully, numBlocksToFetch - // is decremented. - numBlocksToFetch -= 1 - // Update total number of blocks to fetch, reflecting the multiple local shuffle - // chunks. - numBlocksToFetch += bufs.size - bufs.zipWithIndex.foreach { case (buf, chunkId) => - buf.retain() - val shuffleChunkId = ShuffleBlockChunkId(shuffleId, shuffleMergeId, reduceId, - chunkId) - pushBasedFetchHelper.addChunk(shuffleChunkId, bitmaps(chunkId)) - results.put(SuccessFetchResult(shuffleChunkId, SHUFFLE_PUSH_MAP_ID, - pushBasedFetchHelper.localShuffleMergerBlockMgrId, buf.size(), buf, - isNetworkReqDone = false)) - } - } catch { - case e: Exception => - // If we see an exception with reading push-merged-local index file, we fallback - // to fetch the original blocks. We do not report block fetch failure - // and will continue with the remaining local block read. - logWarning("Error occurred while reading push-merged-local index, " + - "prepare to fetch the original blocks", e) - pushBasedFetchHelper.initiateFallbackFetchForPushMergedBlock( - shuffleBlockId, pushBasedFetchHelper.localShuffleMergerBlockMgrId) + case PushMergedLocalMetaFetchResult( Review Comment: I compiled most of the spark release dates with relation to Scala in this document: https://docs.google.com/spreadsheets/d/1yq6mO6x-xPYR4gxHbFjF9KfTsY6j8ceUmaZeoo0Ozlc/edit?gid=1454157208#gid=1454157208 I want to eventually write a proper article about this, however here is the gist of my findings: Spark use to be quick to adopt new Scala version (a year for 2.10 and 2.11) as it matured it got really slow at adopting new Scala version (3 years 7 months for 2.12 and 2 years 4 months for 2.13). When Spark supports multiple versions, for example Scala 2.12 and 2.13 since Spark 3.2.0, commercial platforms such as AWS EMR and Databricks will only support the lowest version. This gives us * Scala 2.11 from 2016-07-27 to 2020-03-10 on EMR * Scala 2.12 since ? 2020 until now (spark 4.0.0 dropping Scala 2.12 in favour of 2.13). With spark 4.0 on 2.13 since this means with can finally [run scala 3 on a spark cluster](https://xebia.com/blog/using-scala-3-with-spark/#running-on-a-real-spark-cluster) (with some little [tweaks](https://vincenzobaz.github.io/spark-scala3/how_it_works.html)). With an early adoption of Databrick for [Scala 2.13](https://docs.databricks.com/aws/en/release-notes/runtime/16.4lts), I expect other vendors to follow. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org