sumeetgajjar commented on a change in pull request #35047:
URL: https://github.com/apache/spark/pull/35047#discussion_r779813987



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
##########
@@ -476,23 +494,60 @@ private[joins] object UnsafeHashedRelation {
       numFields = row.numFields()
       val key = keyGenerator(row)
       if (!key.anyNull || allowsNullKey) {
-        val loc = binaryMap.lookup(key.getBaseObject, key.getBaseOffset, 
key.getSizeInBytes)
-        if (!(ignoresDuplicatedKey && loc.isDefined)) {
-          val success = loc.append(
-            key.getBaseObject, key.getBaseOffset, key.getSizeInBytes,
-            row.getBaseObject, row.getBaseOffset, row.getSizeInBytes)
-          if (!success) {
-            binaryMap.free()
-            throw 
QueryExecutionErrors.cannotAcquireMemoryToBuildUnsafeHashedRelationError()
-          }
-        }
+        mapAppendHelper(key, row, binaryMap)
       } else if (isNullAware) {
         binaryMap.free()
         return HashedRelationWithAllNullKeys
       }
     }
 
-    new UnsafeHashedRelation(key.size, numFields, binaryMap)
+    val relation = new UnsafeHashedRelation(key.size, numFields, binaryMap)
+    val reorderMap = reorderFactor.exists(_ * binaryMap.numKeys() <= 
binaryMap.numValues())
+    if (reorderMap) {
+      // Reorganize the hash map so that nodes of a given linked list are next 
to each other in
+      // memory.
+      logInfo(s"Reordering BytesToBytesMap, uniqueKeys: 
${binaryMap.numKeys()}, " +
+        s"totalNumValues: ${binaryMap.numValues()}")
+      // An exception due to insufficient memory can occur either during 
initialization or while
+      // adding rows to the map.
+      // 1. Failure occurs during initialization i.e. in 
BytesToBytesMap.allocate:
+      // release of the partially allocated memory is already taken care of 
through
+      // MemoryConsumer.allocateArray method thus no further action is 
required.
+      // 2. Failure occurs while adding rows to the map: mapAppendHelper 
invokes
+      // BytesToBytesMap.free method when a row cannot be appended to the map, 
thereby cleaning the
+      // partially allocated memory.
+      try {
+        val compactMap = new BytesToBytesMap(
+          taskMemoryManager,
+          // Only 70% of the slots can be used before growing, more capacity 
help to reduce
+          // collision
+          (binaryMap.numKeys() * 1.5 + 1).toInt,
+          pageSizeBytes)
+        // relation.keys() returns all keys and not just distinct keys thus 
distinct operation is
+        // applied to find unique keys
+        relation.keys().map(_.copy()).toSeq.distinct.foreach { key =>
+          relation.get(key).foreach { row =>
+            val unsafeRow = row.asInstanceOf[UnsafeRow]
+            val unsafeKey = keyGenerator(unsafeRow)
+            mapAppendHelper(unsafeKey, unsafeRow, compactMap)
+          }
+        }
+        relation.close()
+        logInfo("BytesToBytesMap reordered")
+        new UnsafeHashedRelation(key.size, numFields, compactMap)
+      } catch {
+        case e: SparkOutOfMemoryError =>
+          logWarning("Reordering BytesToBytesMap failed, " +

Review comment:
       Good catch, will modify the warning message.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to