cxzl25 commented on pull request #28286:
URL: https://github.com/apache/spark/pull/28286#issuecomment-683543986
If you want to use sql to reproduce this bug, it is really difficult.
I write a ut to test UnsafeHashedRelation.
I'm not sure if this can help.
org.apache.spark.sql.execution.joins.HashedRelationSuite
```scala
test("SPARK-31511: Make BytesToBytesMap iterators thread-safe") {
val ser = sparkContext.env.serializer.newInstance()
val key = Seq(BoundReference(0, LongType, false))
val unsafeProj = UnsafeProjection.create(
Seq(BoundReference(0, LongType, false), BoundReference(1, IntegerType,
true)))
val rows = (0 until 10000).map(i =>
unsafeProj(InternalRow(Int.int2long(i), i + 1)).copy())
val unsafeHashed = UnsafeHashedRelation(rows.iterator, key, 1, mm)
val os = new ByteArrayOutputStream()
val thread1 = new Thread {
override def run(): Unit = {
val out = new ObjectOutputStream(os)
unsafeHashed.asInstanceOf[UnsafeHashedRelation].writeExternal(out)
out.flush()
}
}
val thread2 = new Thread {
override def run(): Unit = {
val threadOut = new ObjectOutputStream(new ByteArrayOutputStream())
unsafeHashed.asInstanceOf[UnsafeHashedRelation].writeExternal(threadOut)
threadOut.flush()
}
}
thread1.start()
thread2.start()
thread1.join()
thread2.join()
val unsafeHashed2 =
ser.deserialize[UnsafeHashedRelation](ser.serialize(unsafeHashed))
val os2 = new ByteArrayOutputStream()
val out2 = new ObjectOutputStream(os2)
unsafeHashed2.writeExternal(out2)
out2.flush()
assert(java.util.Arrays.equals(os.toByteArray, os2.toByteArray))
}
```
before:

after:

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]