HyukjinKwon commented on a change in pull request #23787: [SPARK-26830][SQL][R]
Vectorized R dapply() implementation
URL: https://github.com/apache/spark/pull/23787#discussion_r259602083
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/r/ArrowRRunner.scala
##########
@@ -55,13 +56,32 @@ class ArrowRRunner(
numPartitions = -1,
isDataFrame = true,
schema.fieldNames,
- RRunnerModes.DATAFRAME_GAPPLY) {
+ mode) {
+
+ // TODO: it needs to refactor to share the same code with RRunner, and have
separate
+ // ArrowRRunners.
+ private val getNextBatch = {
+ if (mode == RRunnerModes.DATAFRAME_GAPPLY) {
+ // gapply
+ (inputIterator: Iterator[_], keys:
collection.mutable.ArrayBuffer[Array[Byte]]) => {
+ val (key, nextBatch) = inputIterator
+ .asInstanceOf[Iterator[(Array[Byte], Iterator[InternalRow])]].next()
+ keys.append(key)
+ nextBatch
+ }
+ } else {
+ // dapply
+ (inputIterator: Iterator[_], keys:
collection.mutable.ArrayBuffer[Array[Byte]]) => {
+ inputIterator
+ .asInstanceOf[Iterator[Iterator[InternalRow]]].next()
+ }
+ }
+ }
protected override def writeData(
dataOut: DataOutputStream,
printOut: PrintStream,
- iter: Iterator[_]): Unit = if (iter.hasNext) {
- val inputIterator = iter.asInstanceOf[Iterator[(Array[Byte],
Iterator[InternalRow])]]
+ inputIterator: Iterator[_]): Unit = if (inputIterator.hasNext) {
Review comment:
Oh, yea. I actually manually tested them. I'll fix the tests to check it as
well.
Actually, I noticed R worker works a bit differently.
If the iterator is empty, it sends `0` first and then the other data is not
used. (see `isEmpty <- SparkR:::readInt(inputCon)` at `worker.R` and,
```scala
if (!iter.hasNext) {
dataOut.writeInt(0)
} else {
dataOut.writeInt(1)
}
```
at `RRunner.scala`).
So, virtually it doesn't matter if we send the data or not. I just simply
decided to ignore this since it's already ignored at R's worker side.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]