c21 commented on a change in pull request #35047:
URL: https://github.com/apache/spark/pull/35047#discussion_r779783880
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -3380,6 +3380,17 @@ object SQLConf {
.checkValue(_ >= 0, "The value must be non-negative.")
.createWithDefault(8)
+ val HASHED_RELATION_REORDER_FACTOR =
buildConf("spark.sql.hashedRelationReorderFactor")
+ .doc("The HashedRelation will be reordered if the number of unique keys
times this factor is " +
+ "less than equal to the total number of rows in the HashedRelation. " +
+ "The reordering places all rows with the same key adjacent to each other
to improve " +
+ "spatial locality. This provides a performance boost while iterating
over the rows for a " +
+ "given key due to increased cache hits")
+ .version("3.3.0")
+ .doubleConf
+ .checkValue(_ > 1, "The value must be greater than 1.")
+ .createOptional
Review comment:
curious of what would be a good default value for this? Are we expecting
users to tune this config accordingly for each query? If user needs to tune
this config for each query, then I feel this feature is less useful, because
user can always choose to rewrite query to sort on join keys before join.
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
##########
@@ -1073,8 +1135,51 @@ private[joins] object LongHashedRelation {
return HashedRelationWithAllNullKeys
}
}
- map.optimize()
- new LongHashedRelation(numFields, map)
+
+ val reorderMap = reorderFactor.exists(_ * map.numUniqueKeys <=
map.numTotalValues)
+ val finalMap = if (reorderMap) {
+ // reorganize the hash map so that nodes of a given linked list are next
to each other in
+ // memory.
+ logInfo(s"Reordering LongToUnsafeRowMap, numUniqueKeys:
${map.numUniqueKeys}, " +
+ s"numTotalValues: ${map.numTotalValues}")
+ // An exception due to insufficient memory can occur either during
initialization or while
+ // adding rows to the map.
+ // 1. Failure occurs during initialization i.e. in
LongToUnsafeRowMap.init:
+ // release of the partially allocated memory is already taken care of in
the
+ // LongToUnsafeRowMap.ensureAcquireMemory method thus no further action
is required.
+ // 2. Failure occurs while adding rows to the map: the partially
allocated memory
+ // is not cleaned up, thus LongToUnsafeRowMap.free is invoked in the
catch clause.
+ var maybeCompactMap: Option[LongToUnsafeRowMap] = None
+ try {
+ maybeCompactMap = Some(new LongToUnsafeRowMap(taskMemoryManager,
+ Math.toIntExact(map.numUniqueKeys)))
+ val compactMap = maybeCompactMap.get
+ val resultRow = new UnsafeRow(numFields)
+ map.keys().foreach { rowKey =>
+ val key = rowKey.getLong(0)
+ map.get(key, resultRow).foreach { row =>
+ compactMap.append(key, row)
+ }
Review comment:
> It's a valid concern that this puts more memory pressure on the
driver. Is it possible to improve the relation-building logic and make it
co-locate the values of the same key? Then we don't need to rewrite the
relation.
+1 on this. Beside memory concern, I am also not sure of the performance
penalty when doing shuffled hash join with large hash table in memory. We need
to probe each key in the hash table to rebuild the table, and we kind of waste
of the time of 1st build of table.
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
##########
@@ -476,23 +494,60 @@ private[joins] object UnsafeHashedRelation {
numFields = row.numFields()
val key = keyGenerator(row)
if (!key.anyNull || allowsNullKey) {
- val loc = binaryMap.lookup(key.getBaseObject, key.getBaseOffset,
key.getSizeInBytes)
- if (!(ignoresDuplicatedKey && loc.isDefined)) {
- val success = loc.append(
- key.getBaseObject, key.getBaseOffset, key.getSizeInBytes,
- row.getBaseObject, row.getBaseOffset, row.getSizeInBytes)
- if (!success) {
- binaryMap.free()
- throw
QueryExecutionErrors.cannotAcquireMemoryToBuildUnsafeHashedRelationError()
- }
- }
+ mapAppendHelper(key, row, binaryMap)
} else if (isNullAware) {
binaryMap.free()
return HashedRelationWithAllNullKeys
}
}
- new UnsafeHashedRelation(key.size, numFields, binaryMap)
+ val relation = new UnsafeHashedRelation(key.size, numFields, binaryMap)
+ val reorderMap = reorderFactor.exists(_ * binaryMap.numKeys() <=
binaryMap.numValues())
+ if (reorderMap) {
+ // Reorganize the hash map so that nodes of a given linked list are next
to each other in
+ // memory.
+ logInfo(s"Reordering BytesToBytesMap, uniqueKeys:
${binaryMap.numKeys()}, " +
+ s"totalNumValues: ${binaryMap.numValues()}")
+ // An exception due to insufficient memory can occur either during
initialization or while
+ // adding rows to the map.
+ // 1. Failure occurs during initialization i.e. in
BytesToBytesMap.allocate:
+ // release of the partially allocated memory is already taken care of
through
+ // MemoryConsumer.allocateArray method thus no further action is
required.
+ // 2. Failure occurs while adding rows to the map: mapAppendHelper
invokes
+ // BytesToBytesMap.free method when a row cannot be appended to the map,
thereby cleaning the
+ // partially allocated memory.
+ try {
+ val compactMap = new BytesToBytesMap(
+ taskMemoryManager,
+ // Only 70% of the slots can be used before growing, more capacity
help to reduce
+ // collision
+ (binaryMap.numKeys() * 1.5 + 1).toInt,
+ pageSizeBytes)
+ // relation.keys() returns all keys and not just distinct keys thus
distinct operation is
+ // applied to find unique keys
+ relation.keys().map(_.copy()).toSeq.distinct.foreach { key =>
+ relation.get(key).foreach { row =>
+ val unsafeRow = row.asInstanceOf[UnsafeRow]
+ val unsafeKey = keyGenerator(unsafeRow)
+ mapAppendHelper(unsafeKey, unsafeRow, compactMap)
+ }
+ }
+ relation.close()
+ logInfo("BytesToBytesMap reordered")
+ new UnsafeHashedRelation(key.size, numFields, compactMap)
+ } catch {
+ case e: SparkOutOfMemoryError =>
+ logWarning("Reordering BytesToBytesMap failed, " +
Review comment:
HashedRelation building can happen either on driver side (for broadcast
join), or executor side (for shuffled hash join), so `try increasing the driver
memory to mitigate it` is not accurate suggestion for users. We probably want
to suggest users to disable reordering because it causes OOM here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]