pan3793 commented on code in PR #52810:
URL: https://github.com/apache/spark/pull/52810#discussion_r2485455395
##########
sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectStatement.scala:
##########
@@ -49,33 +51,51 @@ class SparkConnectStatement(conn: SparkConnectConnection)
extends Statement {
}
override def executeQuery(sql: String): ResultSet = {
- checkOpen()
-
- val df = conn.spark.sql(sql)
- val sparkResult = df.collectResult()
- operationId = sparkResult.operationId
- resultSet = new SparkConnectResultSet(sparkResult, this)
- resultSet
+ val hasResultSet = execute(sql)
+ if (hasResultSet) {
+ assert(resultSet != null)
+ resultSet
+ } else {
+ throw new SQLException("The query does not produce a ResultSet.")
+ }
}
override def executeUpdate(sql: String): Int = {
- checkOpen()
-
- val df = conn.spark.sql(sql)
- val sparkResult = df.collectResult()
- operationId = sparkResult.operationId
- resultSet = null
+ val hasResultSet = execute(sql)
+ if (hasResultSet) {
+ // user are not expected to access the result set in this case,
+ // we must close it to avoid memory leak.
+ resultSet.close()
+ throw new SQLException("The query produces a ResultSet.")
+ } else {
+ assert(resultSet == null)
+ getUpdateCount
+ }
+ }
- // always return 0 because affected rows is not supported yet
- 0
+ private def hasResultSet(sparkResult: SparkResult[_]): Boolean = {
+ // suppose this works in most cases
+ sparkResult.schema.length > 0
}
override def execute(sql: String): Boolean = {
checkOpen()
- // always perform executeQuery and reture a ResultSet
- executeQuery(sql)
- true
+ // stmt can be reused to exeute more than one queries,
+ // reset before executinng new query
Review Comment:
thanks, updated.
##########
sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectStatement.scala:
##########
@@ -49,33 +51,51 @@ class SparkConnectStatement(conn: SparkConnectConnection)
extends Statement {
}
override def executeQuery(sql: String): ResultSet = {
- checkOpen()
-
- val df = conn.spark.sql(sql)
- val sparkResult = df.collectResult()
- operationId = sparkResult.operationId
- resultSet = new SparkConnectResultSet(sparkResult, this)
- resultSet
+ val hasResultSet = execute(sql)
+ if (hasResultSet) {
+ assert(resultSet != null)
+ resultSet
+ } else {
+ throw new SQLException("The query does not produce a ResultSet.")
+ }
}
override def executeUpdate(sql: String): Int = {
- checkOpen()
-
- val df = conn.spark.sql(sql)
- val sparkResult = df.collectResult()
- operationId = sparkResult.operationId
- resultSet = null
+ val hasResultSet = execute(sql)
+ if (hasResultSet) {
+ // user are not expected to access the result set in this case,
+ // we must close it to avoid memory leak.
+ resultSet.close()
+ throw new SQLException("The query produces a ResultSet.")
+ } else {
+ assert(resultSet == null)
+ getUpdateCount
+ }
+ }
- // always return 0 because affected rows is not supported yet
- 0
+ private def hasResultSet(sparkResult: SparkResult[_]): Boolean = {
+ // suppose this works in most cases
+ sparkResult.schema.length > 0
}
override def execute(sql: String): Boolean = {
checkOpen()
- // always perform executeQuery and reture a ResultSet
- executeQuery(sql)
- true
+ // stmt can be reused to exeute more than one queries,
Review Comment:
thanks, updated.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]