HyukjinKwon commented on a change in pull request #24958: [SPARK-28153][PYTHON]
Use AtomicReference at InputFileBlockHolder (to support input_file_name with
Python UDF)
URL: https://github.com/apache/spark/pull/24958#discussion_r297203041
##########
File path: core/src/main/scala/org/apache/spark/rdd/InputFileBlockHolder.scala
##########
@@ -68,11 +71,17 @@ private[spark] object InputFileBlockHolder {
require(filePath != null, "filePath cannot be null")
require(startOffset >= 0, s"startOffset ($startOffset) cannot be negative")
require(length >= 0, s"length ($length) cannot be negative")
- inputBlock.set(new FileBlock(UTF8String.fromString(filePath), startOffset,
length))
+ inputBlock.get().set(new FileBlock(UTF8String.fromString(filePath),
startOffset, length))
}
/**
* Clears the input file block to default value.
*/
def unset(): Unit = inputBlock.remove()
+
+ /**
+ * Initializes thread local by explicitly getting the value. It triggers
ThreadLocal's
+ * initialValue in the parent thread.
+ */
+ def initialize(): Unit = inputBlock.get()
Review comment:
We need this. Here was where I was stuck and confused for a while. We create
the thread local but it doesn't call `initialValue`.
So, previously, here's what happens:
1. Input iterator that calls `InputFileBlockHolder.set` is executed in a
child thread (in Python writer thread)
2. The child thread calls **`initialValue`** (not `childValue`) and sets new
reference
3. After that, the parent thread reads it (say,
`InputFileBlockHolder.getXXX`). Here, the parent threads calls
**`initialValue`** again because `initialValue` was never called.
After the fix,
1. At the start of the task, the parent thread calls `initialValue`
2. After that, the child thread calls **`childValue`** (not `initialValue`);
therefore, it points the same reference.
3. After that, the parent threads can access to the same reference.
I double checked those so if I am not like totally mistaken, should be
correct ^.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]