cxzl25 commented on pull request #28286: URL: https://github.com/apache/spark/pull/28286#issuecomment-682019408
I encountered a problem with incorrect results in the production environment. After several days of research, I found that the patch can solve the problem. **It is recommended to fix it in the previous version, because before this patch, the result may be incorrect.** @HyukjinKwon @dongjoon-hyun Before this patch, `BytesToBytesMap.iterator` was not thread-safe. Because `BroadcastExchangeExec` is multi-threaded, Thread 1 calls `BytesToBytesMap.iterator` in broadcast `UnsafeHashedRelation`. Thread 2 needs to write the `UnsafeHashedRelation` of thread 1 to disk because of insufficient driver memory, and calls `BytesToBytesMap.iterator`. https://github.com/apache/spark/blob/8749b2b6fae5ee0ce7b48aae6d859ed71e98491d/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala#L368 broadcast-exchange-0 2020-08-27 23:06:00.270000 BytesToBytesMap Addr:1328576284 ``` java.lang.Thread.getStackTrace(Thread.java:1556) org.apache.spark.unsafe.map.BytesToBytesMap.iterator(BytesToBytesMap.java:418) org.apache.spark.sql.execution.joins.UnsafeHashedRelation.org$apache$spark$sql$execution$joins$UnsafeHashedRelation$$write(HashedRelation.scala:225) ... org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:277) org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:126) org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:88) org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34) org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:56) org.apache.spark.SparkContext.broadcast(SparkContext.scala:1500) ``` broadcast-exchange-1 2020-08-27 23:06:00.429000 BytesToBytesMap Addr:1328576284 ``` java.lang.Thread.getStackTrace(Thread.java:1556)org.apache.spark.unsafe.map.BytesToBytesMap.iterator(BytesToBytesMap.java:418) org.apache.spark.sql.execution.joins.UnsafeHashedRelation.org$apache$spark$sql$execution$joins$UnsafeHashedRelation$$write(HashedRelation.scala:225) org.apache.spark.sql.execution.joins.UnsafeHashedRelation$$anonfun$write$1.apply$mcV$sp(HashedRelation.scala:198) ... org.apache.spark.storage.DiskStore.put(DiskStore.scala:69) org.apache.spark.storage.BlockManager.dropFromMemory(BlockManager.scala:1378) org.apache.spark.storage.memory.MemoryStore.org$apache$spark$storage$memory$MemoryStore$$dropBlock$1(MemoryStore.scala:524) org.apache.spark.storage.memory.MemoryStore$$anonfun$evictBlocksToFreeSpace$2.apply(MemoryStore.scala:545) org.apache.spark.storage.memory.MemoryStore$$anonfun$evictBlocksToFreeSpace$2.apply(MemoryStore.scala:539) scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) org.apache.spark.storage.memory.MemoryStore.evictBlocksToFreeSpace(MemoryStore.scala:539) org.apache.spark.memory.StorageMemoryPool.acquireMemory(StorageMemoryPool.scala:92) org.apache.spark.memory.StorageMemoryPool.acquireMemory(StorageMemoryPool.scala:73) org.apache.spark.memory.UnifiedMemoryManager.acquireStorageMemory(UnifiedMemoryManager.scala:179) org.apache.spark.memory.UnifiedMemoryManager.acquireUnrollMemory(UnifiedMemoryManager.scala:186) org.apache.spark.storage.memory.MemoryStore.reserveUnrollMemoryForThisTask(MemoryStore.scala:582) org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:223) org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1039) org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1030) org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:970) org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1030) org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:793) org.apache.spark.storage.BlockManager.putSingle(BlockManager.scala:1351) org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:122) org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:88) org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34) org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:56) org.apache.spark.SparkContext.broadcast(SparkContext.scala:1500) ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
