Github user Dooyoung-Hwang commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22219#discussion_r213597102
  
    --- Diff: 
sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
 ---
    @@ -289,6 +289,14 @@ private[hive] class SparkExecuteStatementOperation(
           sqlContext.sparkContext.cancelJobGroup(statementId)
         }
       }
    +
    +  private def getResultIterator(): Iterator[SparkRow] = {
    +    val (totalRowCount, iterResult) = result.collectCountAndIterator()
    +    val batchCollectLimit =
    +      
sqlContext.getConf(SQLConf.THRIFTSERVER_BATCH_COLLECTION_LIMIT.key).toLong
    +    resultList = if (totalRowCount < batchCollectLimit) 
Some(iterResult.toArray) else None
    +    if (resultList.isDefined) resultList.get.iterator else iterResult
    --- End diff --
    
    @viirya 
    I share my idea of the problem you commented.
    
    1. Change the return type of "collectCountAndIterator" to tuple of (Long, 
SeqView)
    2. The SeqView is created from encoded result array(which is the result of 
getByteArrayRdd().collect() in SparkPlan), and holds deserializing operations 
defined in DataSet.
    3. Change type of resultList in SparkExecuteStatementOperation to 
Option[Iterable[SparkRow]], because both Array & SeqView are Iterable.
    4. ThriftServer checks if row count exceeds 
THRIFTSERVER_BATCH_COLLECTION_LIMIT, and decide.
       -> if row count > THRIFTSERVER_BATCH_COLLECTION_LIMIT => resultList 
cache SeqView.
       -> else resultList caches Array which is collected from SeqView. => 
resultList cache Array.
    
    How do you think about this idea?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to