Github user davies commented on the pull request:

    https://github.com/apache/spark/pull/8662#issuecomment-140989401
  
    Here are some benchmark with different approaches:
    ```
    Without fix:
    Number of udfs: 0 - 0.142887830734
    Number of udfs: 1 - 0.948309898376
    Number of udfs: 2 - 2.079007864
    Number of udfs: 3 - 4.02105379105
    Number of udfs: 4 - 8.20960092545
    Number of udfs: 5 - 17.1744220257
    
    This PR (synchonized batch):
    Number of udfs: 0 - 0.146151065826
    Number of udfs: 1 - 1.73253297806
    Number of udfs: 2 - 2.73584198952
    Number of udfs: 3 - 3.70912384987
    Number of udfs: 4 - 5.21050810814
    Number of udfs: 5 - 6.24335598946
    
    Pass row around:
    Number of udfs: 0 - 0.150754928589
    Number of udfs: 1 - 2.33548307419
    Number of udfs: 2 - 3.16792798042
    Number of udfs: 3 - 4.92886400223
    Number of udfs: 4 - 5.86024093628
    Number of udfs: 5 - 6.71114897728
    
    Old RDD cache
    Number of udfs: 0 - 0.187371969223
    Number of udfs: 1 - 1.12094593048
    Number of udfs: 2 - 1.82404088974
    Number of udfs: 3 - 3.20661616325
    Number of udfs: 4 - 3.62113809586
    Number of udfs: 5 - 4.25435996056
    
    Using CachedOnceRDD
    Number of udfs: 0 - 0.143986940384
    Number of udfs: 1 - 1.10472989082
    Number of udfs: 2 - 1.96430683136
    Number of udfs: 3 - 2.14014697075
    Number of udfs: 4 - 2.37295603752
    Number of udfs: 5 - 2.8883831501
    ```
    
    It seems that passing row into Python is slower than current approach (will 
much slower with wider table). 
    
    Using cache is faster than current approach, because of asynchronous 
pipeline, CachedOnceRDD is faster than RDD cache, also need fewer memory.
    
    The CachedOnceRDD looks like this:
    ```
    class CachedOnceRDD(prev: RDD[InternalRow]) extends RDD[InternalRow](prev) {
      override val partitioner = firstParent[InternalRow].partitioner
      override def getPartitions: Array[Partition] = 
firstParent[InternalRow].partitions
    
      var visits:Int = 0
      var owner:Int = 0
      var prevIter:Iterator[InternalRow] = null
      var buffer: java.util.Queue[InternalRow] = null
    
      override def compute(split: Partition, context: TaskContext): 
Iterator[InternalRow] = {
        var myid: Int = 0
        synchronized {
          visits += 1
          myid = visits
          if (buffer == null) {
            prevIter = prev.iterator(split, context)
            buffer = new java.util.concurrent.ConcurrentLinkedQueue[InternalRow]
            owner = myid
          }
        }
        new Iterator[InternalRow] {
          var row: InternalRow = null
          override def hasNext: Boolean = {
            if (row == null) {
              if (owner == myid) {
                buffer.synchronized {
                  if (prevIter != null) {
                    val r = prevIter.hasNext
                    if (r) {
                      row = prevIter.next().copy()
                      buffer.offer(row)
                    } else {
                      prevIter = null
                    }
                  }
                }
              } else {
                if (!buffer.isEmpty) {
                  row = buffer.poll()
                } else {
                  buffer.synchronized {
                    if (!buffer.isEmpty) {
                      row = buffer.poll()
                    } else if (prevIter == null) {
                    } else {
                      owner = myid
                      hasNext
                    }
                  }
                }
              }
            }
            row != null
          }
    
          override def next: InternalRow = {
            val r = row
            row = null
            r
          }
        }
      }
    }
    ```
    
    And we need to change this
    ```
    -    val childResults = child.execute().map(_.copy())
    +    val childResults = new CachedOnceRDD(child.execute())
    ```
    
    cc @rxin @marmbrus 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to