iodone commented on code in PR #2678:
URL: https://github.com/apache/incubator-kyuubi/pull/2678#discussion_r878967445
##########
externals/kyuubi-trino-engine/src/test/scala/org/apache/kyuubi/engine/trino/TrinoStatementSuite.scala:
##########
@@ -52,16 +52,16 @@ class TrinoStatementSuite extends WithTrinoContainerServer {
assert(this.schema === trinoStatement.getCurrentDatabase)
val trinoStatement2 = TrinoStatement(trinoContext, kyuubiConf, "use sf1")
- trinoStatement2.execute()
-
+ // if trinoStatement.execute return iterator is lazy, call toArray to
strict evaluation
Review Comment:
`TrinoStatement.execute` returns the iterator, so unified call
`resultSet.toArray` can be avoided whether it is lazy or strict
##########
externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/TrinoStatement.scala:
##########
@@ -80,63 +74,34 @@ class TrinoStatement(
}
}
- /**
- * Execute sql and return ResultSet.
- */
- def execute(): Iterable[List[Any]] = {
- val rowQueue = new ArrayBlockingQueue[List[Any]](MAX_QUEUED_ROWS)
-
- val dataProcessing = Future[Unit] {
- while (trino.isRunning) {
- val data = trino.currentData().getData()
- if (data != null) {
- data.asScala.map(_.asScala.toList)
- .foreach(e => putOrThrow(rowQueue, e))
+ def execute(): Iterator[List[Any]] = {
+ Iterator.continually {
+ @tailrec
+ def getData(): (Boolean, List[List[Any]]) = {
+ if (trino.isRunning) {
+ val data = trino.currentData().getData()
+ trino.advance()
+ if (data != null) {
+ (true, data.asScala.toList.map(_.asScala.toList))
+ } else {
+ getData()
+ }
+ } else {
+ Verify.verify(trino.isFinished)
+ val finalStatus = trino.finalStatusInfo()
+ if (finalStatus.getError() != null) {
+ throw KyuubiSQLException(
+ s"Query ${finalStatus.getId} failed:
${finalStatus.getError.getMessage}")
Review Comment:
yes,the `queryId` is generated by trinoServer at the time of the first query
and is unique throughout the query process
##########
externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/ExecuteStatement.scala:
##########
@@ -77,6 +81,24 @@ class ExecuteStatement(
}
}
+ override def getNextRowSet(order: FetchOrientation, rowSetSize: Int):
TRowSet = {
+ validateDefaultFetchOrientation(order)
+ assertState(OperationState.FINISHED)
+ setHasResultSet(true)
+ (order, incrementalCollect) match {
+ case (FETCH_NEXT, _) => iter.fetchNext()
+ case (FETCH_PRIOR, false) => iter.fetchPrior(rowSetSize)
+ case (FETCH_FIRST, false) => iter.fetchAbsolute(0)
+ case _ =>
+ val mode = if (incrementalCollect) "incremental collect" else "full
collect"
+ throw KyuubiSQLException(s"Fetch orientation[$order] is not supported
in $mode mode")
Review Comment:
The error message returned to the user, we generally use KyubiSQLException?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]