Github user davies commented on the pull request:
https://github.com/apache/spark/pull/8662#issuecomment-140989401
Here are some benchmark with different approaches:
```
Without fix:
Number of udfs: 0 - 0.142887830734
Number of udfs: 1 - 0.948309898376
Number of udfs: 2 - 2.079007864
Number of udfs: 3 - 4.02105379105
Number of udfs: 4 - 8.20960092545
Number of udfs: 5 - 17.1744220257
This PR (synchonized batch):
Number of udfs: 0 - 0.146151065826
Number of udfs: 1 - 1.73253297806
Number of udfs: 2 - 2.73584198952
Number of udfs: 3 - 3.70912384987
Number of udfs: 4 - 5.21050810814
Number of udfs: 5 - 6.24335598946
Pass row around:
Number of udfs: 0 - 0.150754928589
Number of udfs: 1 - 2.33548307419
Number of udfs: 2 - 3.16792798042
Number of udfs: 3 - 4.92886400223
Number of udfs: 4 - 5.86024093628
Number of udfs: 5 - 6.71114897728
Old RDD cache
Number of udfs: 0 - 0.187371969223
Number of udfs: 1 - 1.12094593048
Number of udfs: 2 - 1.82404088974
Number of udfs: 3 - 3.20661616325
Number of udfs: 4 - 3.62113809586
Number of udfs: 5 - 4.25435996056
Using CachedOnceRDD
Number of udfs: 0 - 0.143986940384
Number of udfs: 1 - 1.10472989082
Number of udfs: 2 - 1.96430683136
Number of udfs: 3 - 2.14014697075
Number of udfs: 4 - 2.37295603752
Number of udfs: 5 - 2.8883831501
```
It seems that passing row into Python is slower than current approach (will
much slower with wider table).
Using cache is faster than current approach, because of asynchronous
pipeline, CachedOnceRDD is faster than RDD cache, also need fewer memory.
The CachedOnceRDD looks like this:
```
class CachedOnceRDD(prev: RDD[InternalRow]) extends RDD[InternalRow](prev) {
override val partitioner = firstParent[InternalRow].partitioner
override def getPartitions: Array[Partition] =
firstParent[InternalRow].partitions
var visits:Int = 0
var owner:Int = 0
var prevIter:Iterator[InternalRow] = null
var buffer: java.util.Queue[InternalRow] = null
override def compute(split: Partition, context: TaskContext):
Iterator[InternalRow] = {
var myid: Int = 0
synchronized {
visits += 1
myid = visits
if (buffer == null) {
prevIter = prev.iterator(split, context)
buffer = new java.util.concurrent.ConcurrentLinkedQueue[InternalRow]
owner = myid
}
}
new Iterator[InternalRow] {
var row: InternalRow = null
override def hasNext: Boolean = {
if (row == null) {
if (owner == myid) {
buffer.synchronized {
if (prevIter != null) {
val r = prevIter.hasNext
if (r) {
row = prevIter.next().copy()
buffer.offer(row)
} else {
prevIter = null
}
}
}
} else {
if (!buffer.isEmpty) {
row = buffer.poll()
} else {
buffer.synchronized {
if (!buffer.isEmpty) {
row = buffer.poll()
} else if (prevIter == null) {
} else {
owner = myid
hasNext
}
}
}
}
}
row != null
}
override def next: InternalRow = {
val r = row
row = null
r
}
}
}
}
```
And we need to change this
```
- val childResults = child.execute().map(_.copy())
+ val childResults = new CachedOnceRDD(child.execute())
```
cc @rxin @marmbrus
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]