c21 commented on a change in pull request #35047:
URL: https://github.com/apache/spark/pull/35047#discussion_r779783880



##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -3380,6 +3380,17 @@ object SQLConf {
       .checkValue(_ >= 0, "The value must be non-negative.")
       .createWithDefault(8)
 
+  val HASHED_RELATION_REORDER_FACTOR = 
buildConf("spark.sql.hashedRelationReorderFactor")
+    .doc("The HashedRelation will be reordered if the number of unique keys 
times this factor is " +
+      "less than equal to the total number of rows in the HashedRelation. " +
+      "The reordering places all rows with the same key adjacent to each other 
to improve " +
+      "spatial locality. This provides a performance boost while iterating 
over the rows for a " +
+      "given key due to increased cache hits")
+    .version("3.3.0")
+    .doubleConf
+    .checkValue(_ > 1, "The value must be greater than 1.")
+    .createOptional

Review comment:
       curious of what would be a good default value for this? Are we expecting 
users to tune this config accordingly for each query? If user needs to tune 
this config for each query, then I feel this feature is less useful, because 
user can always choose to rewrite query to sort on join keys before join.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
##########
@@ -1073,8 +1135,51 @@ private[joins] object LongHashedRelation {
         return HashedRelationWithAllNullKeys
       }
     }
-    map.optimize()
-    new LongHashedRelation(numFields, map)
+
+    val reorderMap = reorderFactor.exists(_ * map.numUniqueKeys <= 
map.numTotalValues)
+    val finalMap = if (reorderMap) {
+      // reorganize the hash map so that nodes of a given linked list are next 
to each other in
+      // memory.
+      logInfo(s"Reordering LongToUnsafeRowMap, numUniqueKeys: 
${map.numUniqueKeys}, " +
+        s"numTotalValues: ${map.numTotalValues}")
+      // An exception due to insufficient memory can occur either during 
initialization or while
+      // adding rows to the map.
+      // 1. Failure occurs during initialization i.e. in 
LongToUnsafeRowMap.init:
+      // release of the partially allocated memory is already taken care of in 
the
+      // LongToUnsafeRowMap.ensureAcquireMemory method thus no further action 
is required.
+      // 2. Failure occurs while adding rows to the map: the partially 
allocated memory
+      // is not cleaned up, thus LongToUnsafeRowMap.free is invoked in the 
catch clause.
+      var maybeCompactMap: Option[LongToUnsafeRowMap] = None
+      try {
+        maybeCompactMap = Some(new LongToUnsafeRowMap(taskMemoryManager,
+          Math.toIntExact(map.numUniqueKeys)))
+        val compactMap = maybeCompactMap.get
+        val resultRow = new UnsafeRow(numFields)
+        map.keys().foreach { rowKey =>
+          val key = rowKey.getLong(0)
+          map.get(key, resultRow).foreach { row =>
+            compactMap.append(key, row)
+          }

Review comment:
       > It's a valid concern that this puts more memory pressure on the 
driver. Is it possible to improve the relation-building logic and make it 
co-locate the values of the same key? Then we don't need to rewrite the 
relation.
   
   +1 on this. Beside memory concern, I am also not sure of the performance 
penalty when doing shuffled hash join with large hash table in memory. We need 
to probe each key in the hash table to rebuild the table, and we kind of waste 
of the time of 1st build of table.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
##########
@@ -476,23 +494,60 @@ private[joins] object UnsafeHashedRelation {
       numFields = row.numFields()
       val key = keyGenerator(row)
       if (!key.anyNull || allowsNullKey) {
-        val loc = binaryMap.lookup(key.getBaseObject, key.getBaseOffset, 
key.getSizeInBytes)
-        if (!(ignoresDuplicatedKey && loc.isDefined)) {
-          val success = loc.append(
-            key.getBaseObject, key.getBaseOffset, key.getSizeInBytes,
-            row.getBaseObject, row.getBaseOffset, row.getSizeInBytes)
-          if (!success) {
-            binaryMap.free()
-            throw 
QueryExecutionErrors.cannotAcquireMemoryToBuildUnsafeHashedRelationError()
-          }
-        }
+        mapAppendHelper(key, row, binaryMap)
       } else if (isNullAware) {
         binaryMap.free()
         return HashedRelationWithAllNullKeys
       }
     }
 
-    new UnsafeHashedRelation(key.size, numFields, binaryMap)
+    val relation = new UnsafeHashedRelation(key.size, numFields, binaryMap)
+    val reorderMap = reorderFactor.exists(_ * binaryMap.numKeys() <= 
binaryMap.numValues())
+    if (reorderMap) {
+      // Reorganize the hash map so that nodes of a given linked list are next 
to each other in
+      // memory.
+      logInfo(s"Reordering BytesToBytesMap, uniqueKeys: 
${binaryMap.numKeys()}, " +
+        s"totalNumValues: ${binaryMap.numValues()}")
+      // An exception due to insufficient memory can occur either during 
initialization or while
+      // adding rows to the map.
+      // 1. Failure occurs during initialization i.e. in 
BytesToBytesMap.allocate:
+      // release of the partially allocated memory is already taken care of 
through
+      // MemoryConsumer.allocateArray method thus no further action is 
required.
+      // 2. Failure occurs while adding rows to the map: mapAppendHelper 
invokes
+      // BytesToBytesMap.free method when a row cannot be appended to the map, 
thereby cleaning the
+      // partially allocated memory.
+      try {
+        val compactMap = new BytesToBytesMap(
+          taskMemoryManager,
+          // Only 70% of the slots can be used before growing, more capacity 
help to reduce
+          // collision
+          (binaryMap.numKeys() * 1.5 + 1).toInt,
+          pageSizeBytes)
+        // relation.keys() returns all keys and not just distinct keys thus 
distinct operation is
+        // applied to find unique keys
+        relation.keys().map(_.copy()).toSeq.distinct.foreach { key =>
+          relation.get(key).foreach { row =>
+            val unsafeRow = row.asInstanceOf[UnsafeRow]
+            val unsafeKey = keyGenerator(unsafeRow)
+            mapAppendHelper(unsafeKey, unsafeRow, compactMap)
+          }
+        }
+        relation.close()
+        logInfo("BytesToBytesMap reordered")
+        new UnsafeHashedRelation(key.size, numFields, compactMap)
+      } catch {
+        case e: SparkOutOfMemoryError =>
+          logWarning("Reordering BytesToBytesMap failed, " +

Review comment:
       HashedRelation building can happen either on driver side (for broadcast 
join), or executor side (for shuffled hash join), so `try increasing the driver 
memory to mitigate it` is not accurate suggestion for users. We probably want 
to suggest users to disable reordering because it causes OOM here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to