sumeetgajjar commented on a change in pull request #35047:
URL: https://github.com/apache/spark/pull/35047#discussion_r779842230



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
##########
@@ -1073,8 +1135,51 @@ private[joins] object LongHashedRelation {
         return HashedRelationWithAllNullKeys
       }
     }
-    map.optimize()
-    new LongHashedRelation(numFields, map)
+
+    val reorderMap = reorderFactor.exists(_ * map.numUniqueKeys <= 
map.numTotalValues)
+    val finalMap = if (reorderMap) {
+      // reorganize the hash map so that nodes of a given linked list are next 
to each other in
+      // memory.
+      logInfo(s"Reordering LongToUnsafeRowMap, numUniqueKeys: 
${map.numUniqueKeys}, " +
+        s"numTotalValues: ${map.numTotalValues}")
+      // An exception due to insufficient memory can occur either during 
initialization or while
+      // adding rows to the map.
+      // 1. Failure occurs during initialization i.e. in 
LongToUnsafeRowMap.init:
+      // release of the partially allocated memory is already taken care of in 
the
+      // LongToUnsafeRowMap.ensureAcquireMemory method thus no further action 
is required.
+      // 2. Failure occurs while adding rows to the map: the partially 
allocated memory
+      // is not cleaned up, thus LongToUnsafeRowMap.free is invoked in the 
catch clause.
+      var maybeCompactMap: Option[LongToUnsafeRowMap] = None
+      try {
+        maybeCompactMap = Some(new LongToUnsafeRowMap(taskMemoryManager,
+          Math.toIntExact(map.numUniqueKeys)))
+        val compactMap = maybeCompactMap.get
+        val resultRow = new UnsafeRow(numFields)
+        map.keys().foreach { rowKey =>
+          val key = rowKey.getLong(0)
+          map.get(key, resultRow).foreach { row =>
+            compactMap.append(key, row)
+          }

Review comment:
       > I am also not sure of the performance penalty when doing shuffled hash 
join with large hash table in memory. We need to probe each key in the hash 
table to rebuild the table, and we kind of waste of the time of 1st build of 
table.
   
   I totally agree regarding wasting time on 1st build of the table. However, 
if we consider the amount of time taken to re-build the table, it is a 
minuscule amount when compared to the overall execution time of the query.
   It can be clearly seen in the above example.
   Also, in the above case, I was running the spark application in local mode. 
If we run the application on Yarn or K8s, the overall execution time will 
increase due to added network cost and the table rebuild time would further 
become a smaller fraction of the overall time.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to