Z1Wu opened a new issue, #6469:
URL: https://github.com/apache/kyuubi/issues/6469

   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://www.apache.org/foundation/policies/conduct)
   
   
   ### Search before asking
   
   - [X] I have searched in the 
[issues](https://github.com/apache/kyuubi/issues?q=is%3Aissue) and found no 
similar issues.
   
   
   ### Describe the bug
   
   # Problem 
   
   When fetching big result using feature introduced by 
https://github.com/apache/kyuubi/issues/5377 ,driver will be oom if there are 
too many `RecordReaderIterator[OrcStruct]` being instantiated in memory.
   
   In current implementation, the number of `RecordReaderIterator[OrcStruct]` 
will be determined  by the number orc files written by insert command which is 
hard to control when run with AQE or other configuration.
   
   
https://github.com/apache/kyuubi/blob/95ed74821c3a2d3a3f402033de7b463966a4bc28/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/FetchOrcStatement.scala#L77-L79
   
   
   Simple script to reproduce the problem:
   
   ``` sh
   
   # connect to kyuubi server and enable fetch file using FetchOrcStatement
   beeline -u 
'jdbc:hive2://<kyuubi-server>:<port>/?spark.executor.instances=40;kyuubi.operation.result.saveToFile.enabled=true;kyuubi.operation.result.saveToFile.minSize=1;spark.submit.deployMode=client;spark.driver.memory=5g;spark.sql.catalog.tpcds=org.apache.kyuubi.spark.connector.tpcds.TPCDSCatalog;'
 -e 'select * from tpcds.sf3000.catalog_returns;'
   
   ```
   
   Error Stack :
   
   ```
   Error: org.apache.kyuubi.KyuubiSQLException: 
org.apache.kyuubi.KyuubiSQLException: Error operating ExecuteStatement: 
java.lang.OutOfMemoryError: Java heap space
        at 
org.apache.orc.impl.RecordReaderUtils.readRanges(RecordReaderUtils.java:423)
        at 
org.apache.orc.impl.RecordReaderUtils.readDiskRanges(RecordReaderUtils.java:491)
        at 
org.apache.orc.impl.RecordReaderUtils$DefaultDataReader.readFileData(RecordReaderUtils.java:103)
        at 
org.apache.orc.impl.reader.StripePlanner.readData(StripePlanner.java:152)
        at 
org.apache.orc.impl.RecordReaderImpl.readStripe(RecordReaderImpl.java:1149)
        at 
org.apache.orc.impl.RecordReaderImpl.advanceStripe(RecordReaderImpl.java:1187)
        at 
org.apache.orc.impl.RecordReaderImpl.advanceToNextRow(RecordReaderImpl.java:1222)
        at 
org.apache.orc.impl.RecordReaderImpl.<init>(RecordReaderImpl.java:254)
        at org.apache.orc.impl.ReaderImpl.rows(ReaderImpl.java:875)
        at 
org.apache.orc.mapreduce.OrcMapreduceRecordReader.<init>(OrcMapreduceRecordReader.java:65)
        at 
org.apache.orc.mapreduce.OrcMapreduceRecordReader.<init>(OrcMapreduceRecordReader.java:59)
        at 
org.apache.orc.mapreduce.OrcInputFormat.createRecordReader(OrcInputFormat.java:72)
        at 
org.apache.kyuubi.engine.spark.operation.OrcFileIterator.getOrcFileIterator(FetchOrcStatement.scala:118)
        at 
org.apache.kyuubi.engine.spark.operation.OrcFileIterator.$anonfun$iters$1(FetchOrcStatement.scala:79)
        at 
org.apache.kyuubi.engine.spark.operation.OrcFileIterator$$Lambda$4101/1725650760.apply(Unknown
 Source)
        at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
        at scala.collection.TraversableLike$$Lambda$29/1297978429.apply(Unknown 
Source)
        at scala.collection.immutable.List.foreach(List.scala:431)
   ....
   ```
   Heap dump when oom occurs:
   
   
![image](https://github.com/apache/kyuubi/assets/21239012/b0bb191c-e8bb-42bd-8c99-e2442a0091f0)
   
   
   We can find when initialize a `RecordReaderIterator`, 
`OrcMapreduceRecordReader` inside the `RecordReaderIterator` will pre-fetch 
some rows which occupy substantial memory.
   
   # Possible Solution 
   
   Instead of initializing all `RecordReaderIterator` when create 
`OrcFileIterator`,we can lazily initialize the RecordReaderIterator to make 
sure that there is only one RecordReaderIterator which reads file current 
fetching by client in driver memory.
   
   
   
   ### Affects Version(s)
   
   master
   
   ### Kyuubi Server Log Output
   
   _No response_
   
   ### Kyuubi Engine Log Output
   
   ```logtalk
   Error: org.apache.kyuubi.KyuubiSQLException: 
org.apache.kyuubi.KyuubiSQLException: Error operating ExecuteStatement: 
java.lang.OutOfMemoryError: Java heap space
        at 
org.apache.orc.impl.RecordReaderUtils.readRanges(RecordReaderUtils.java:423)
        at 
org.apache.orc.impl.RecordReaderUtils.readDiskRanges(RecordReaderUtils.java:491)
        at 
org.apache.orc.impl.RecordReaderUtils$DefaultDataReader.readFileData(RecordReaderUtils.java:103)
        at 
org.apache.orc.impl.reader.StripePlanner.readData(StripePlanner.java:152)
        at 
org.apache.orc.impl.RecordReaderImpl.readStripe(RecordReaderImpl.java:1149)
        at 
org.apache.orc.impl.RecordReaderImpl.advanceStripe(RecordReaderImpl.java:1187)
        at 
org.apache.orc.impl.RecordReaderImpl.advanceToNextRow(RecordReaderImpl.java:1222)
        at 
org.apache.orc.impl.RecordReaderImpl.<init>(RecordReaderImpl.java:254)
        at org.apache.orc.impl.ReaderImpl.rows(ReaderImpl.java:875)
        at 
org.apache.orc.mapreduce.OrcMapreduceRecordReader.<init>(OrcMapreduceRecordReader.java:65)
        at 
org.apache.orc.mapreduce.OrcMapreduceRecordReader.<init>(OrcMapreduceRecordReader.java:59)
        at 
org.apache.orc.mapreduce.OrcInputFormat.createRecordReader(OrcInputFormat.java:72)
        at 
org.apache.kyuubi.engine.spark.operation.OrcFileIterator.getOrcFileIterator(FetchOrcStatement.scala:118)
        at 
org.apache.kyuubi.engine.spark.operation.OrcFileIterator.$anonfun$iters$1(FetchOrcStatement.scala:79)
        at 
org.apache.kyuubi.engine.spark.operation.OrcFileIterator$$Lambda$4101/1725650760.apply(Unknown
 Source)
        at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
        at scala.collection.TraversableLike$$Lambda$29/1297978429.apply(Unknown 
Source)
        at scala.collection.immutable.List.foreach(List.scala:431)
        at 
scala.collection.generic.TraversableForwarder.foreach(TraversableForwarder.scala:38)
        at 
scala.collection.generic.TraversableForwarder.foreach$(TraversableForwarder.scala:38)
        at scala.collection.mutable.ListBuffer.foreach(ListBuffer.scala:47)
        at scala.collection.TraversableLike.map(TraversableLike.scala:286)
        at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
        at scala.collection.AbstractTraversable.map(Traversable.scala:108)
        at 
org.apache.kyuubi.engine.spark.operation.OrcFileIterator.<init>(FetchOrcStatement.scala:79)
        at 
org.apache.kyuubi.engine.spark.operation.FetchOrcStatement.getIterator(FetchOrcStatement.scala:63)
        at 
org.apache.kyuubi.engine.spark.operation.ExecuteStatement.collectAsIterator(ExecuteStatement.scala:199)
        at 
org.apache.kyuubi.engine.spark.operation.ExecuteStatement.$anonfun$executeStatement$1(ExecuteStatement.scala:99)
        at 
org.apache.kyuubi.engine.spark.operation.ExecuteStatement$$Lambda$3260/474749226.apply$mcV$sp(Unknown
 Source)
        at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at 
org.apache.kyuubi.engine.spark.operation.SparkOperation.$anonfun$withLocalProperties$1(SparkOperation.scala:166)
        at 
org.apache.kyuubi.engine.spark.operation.SparkOperation$$Lambda$3261/410226299.apply(Unknown
 Source)
   
        at 
org.apache.kyuubi.KyuubiSQLException$.apply(KyuubiSQLException.scala:69)
        at 
org.apache.kyuubi.engine.spark.operation.SparkOperation$$anonfun$onError$1.$anonfun$applyOrElse$1(SparkOperation.scala:202)
        at org.apache.kyuubi.Utils$.withLockRequired(Utils.scala:432)
        at 
org.apache.kyuubi.operation.AbstractOperation.withLockRequired(AbstractOperation.scala:52)
        at 
org.apache.kyuubi.engine.spark.operation.SparkOperation$$anonfun$onError$1.applyOrElse(SparkOperation.scala:190)
        at 
org.apache.kyuubi.engine.spark.operation.SparkOperation$$anonfun$onError$1.applyOrElse(SparkOperation.scala:183)
        at 
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
        at 
org.apache.kyuubi.engine.spark.operation.ExecuteStatement.executeStatement(ExecuteStatement.scala:104)
        at 
org.apache.kyuubi.engine.spark.operation.ExecuteStatement$$anon$1.run(ExecuteStatement.scala:115)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
   Caused by: java.lang.OutOfMemoryError: Java heap space
        at 
org.apache.orc.impl.RecordReaderUtils.readRanges(RecordReaderUtils.java:423)
        at 
org.apache.orc.impl.RecordReaderUtils.readDiskRanges(RecordReaderUtils.java:491)
        at 
org.apache.orc.impl.RecordReaderUtils$DefaultDataReader.readFileData(RecordReaderUtils.java:103)
        at 
org.apache.orc.impl.reader.StripePlanner.readData(StripePlanner.java:152)
        at 
org.apache.orc.impl.RecordReaderImpl.readStripe(RecordReaderImpl.java:1149)
        at 
org.apache.orc.impl.RecordReaderImpl.advanceStripe(RecordReaderImpl.java:1187)
        at 
org.apache.orc.impl.RecordReaderImpl.advanceToNextRow(RecordReaderImpl.java:1222)
        at 
org.apache.orc.impl.RecordReaderImpl.<init>(RecordReaderImpl.java:254)
        at org.apache.orc.impl.ReaderImpl.rows(ReaderImpl.java:875)
        at 
org.apache.orc.mapreduce.OrcMapreduceRecordReader.<init>(OrcMapreduceRecordReader.java:65)
        at 
org.apache.orc.mapreduce.OrcMapreduceRecordReader.<init>(OrcMapreduceRecordReader.java:59)
        at 
org.apache.orc.mapreduce.OrcInputFormat.createRecordReader(OrcInputFormat.java:72)
        at 
org.apache.kyuubi.engine.spark.operation.OrcFileIterator.getOrcFileIterator(FetchOrcStatement.scala:118)
        at 
org.apache.kyuubi.engine.spark.operation.OrcFileIterator.$anonfun$iters$1(FetchOrcStatement.scala:79)
        at 
org.apache.kyuubi.engine.spark.operation.OrcFileIterator$$Lambda$4101/1725650760.apply(Unknown
 Source)
        at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
        at scala.collection.TraversableLike$$Lambda$29/1297978429.apply(Unknown 
Source)
        at scala.collection.immutable.List.foreach(List.scala:431)
        at 
scala.collection.generic.TraversableForwarder.foreach(TraversableForwarder.scala:38)
        at 
scala.collection.generic.TraversableForwarder.foreach$(TraversableForwarder.scala:38)
        at scala.collection.mutable.ListBuffer.foreach(ListBuffer.scala:47)
        at scala.collection.TraversableLike.map(TraversableLike.scala:286)
        at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
        at scala.collection.AbstractTraversable.map(Traversable.scala:108)
        at 
org.apache.kyuubi.engine.spark.operation.OrcFileIterator.<init>(FetchOrcStatement.scala:79)
        at 
org.apache.kyuubi.engine.spark.operation.FetchOrcStatement.getIterator(FetchOrcStatement.scala:63)
        at 
org.apache.kyuubi.engine.spark.operation.ExecuteStatement.collectAsIterator(ExecuteStatement.scala:199)
        at 
org.apache.kyuubi.engine.spark.operation.ExecuteStatement.$anonfun$executeStatement$1(ExecuteStatement.scala:99)
        at 
org.apache.kyuubi.engine.spark.operation.ExecuteStatement$$Lambda$3260/474749226.apply$mcV$sp(Unknown
 Source)
        at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at 
org.apache.kyuubi.engine.spark.operation.SparkOperation.$anonfun$withLocalProperties$1(SparkOperation.scala:166)
        at 
org.apache.kyuubi.engine.spark.operation.SparkOperation$$Lambda$3261/410226299.apply(Unknown
 Source)
   
        at 
org.apache.kyuubi.KyuubiSQLException$.apply(KyuubiSQLException.scala:69)
        at 
org.apache.kyuubi.operation.ExecuteStatement.waitStatementComplete(ExecuteStatement.scala:135)
        at 
org.apache.kyuubi.operation.ExecuteStatement.$anonfun$runInternal$1(ExecuteStatement.scala:173)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748) (state=,code=0)
   ```
   
   
   ### Kyuubi Server Configurations
   
   _No response_
   
   ### Kyuubi Engine Configurations
   
   _No response_
   
   ### Additional context
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [X] Yes. I would be willing to submit a PR with guidance from the Kyuubi 
community to fix.
   - [ ] No. I cannot submit a PR at this time.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to