This is an automated email from the ASF dual-hosted git repository. hvanhovell pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 784f1d0da7f [SPARK-44636][CONNECT] Leave no dangling iterators 784f1d0da7f is described below commit 784f1d0da7f9d96bbc8ab2dda9d9556691012e17 Author: Alice Sayutina <alice.sayut...@databricks.com> AuthorDate: Wed Aug 2 17:41:47 2023 -0400 [SPARK-44636][CONNECT] Leave no dangling iterators ### What changes were proposed in this pull request? Minorly refactored execute functions to not leave dangling iterators (Note: we also should do that with SparkResult, however in almost all cases there should be no problem with iterators not consumed). ### Why are the changes needed? Needed for ongoing work regarding session reattachment. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? This is intended to be tested after session reattachment is complete (cc juliuszsompolski). Closes #42298 from cdkrot/dangling_iterators. Lead-authored-by: Alice Sayutina <alice.sayut...@databricks.com> Co-authored-by: Alice Sayutina <cdkr...@gmail.com> Co-authored-by: Juliusz Sompolski <ju...@databricks.com> Signed-off-by: Herman van Hovell <her...@databricks.com> --- .../scala/org/apache/spark/sql/SparkSession.scala | 25 +++++++++++++--------- .../sql/connect/client/SparkConnectClient.scala | 6 ++++++ .../connect/client/SparkConnectClientSuite.scala | 19 ++++++++++++++++ 3 files changed, 40 insertions(+), 10 deletions(-) diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala index a3d82156a03..59f3f3526ab 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -252,10 +252,12 @@ class SparkSession private[sql] ( .setSql(sqlText) .addAllPosArgs(args.map(toLiteralProto).toIterable.asJava))) val plan = proto.Plan.newBuilder().setCommand(cmd) - val responseIter = client.execute(plan.build()) + val responseSeq = client.execute(plan.build()).asScala.toSeq - // Note: .toSeq makes the stream be consumed and closed. - val response = responseIter.asScala.toSeq + // sequence is a lazy stream, force materialize it to make sure it is consumed. + responseSeq.foreach(_ => ()) + + val response = responseSeq .find(_.hasSqlCommandResult) .getOrElse(throw new RuntimeException("SQLCommandResult must be present")) @@ -309,10 +311,12 @@ class SparkSession private[sql] ( .setSql(sqlText) .putAllArgs(args.asScala.mapValues(toLiteralProto).toMap.asJava))) val plan = proto.Plan.newBuilder().setCommand(cmd) - val responseIter = client.execute(plan.build()) + val responseSeq = client.execute(plan.build()).asScala.toSeq + + // sequence is a lazy stream, force materialize it to make sure it is consumed. + responseSeq.foreach(_ => ()) - // Note: .toSeq makes the stream be consumed and closed. - val response = responseIter.asScala.toSeq + val response = responseSeq .find(_.hasSqlCommandResult) .getOrElse(throw new RuntimeException("SQLCommandResult must be present")) @@ -549,14 +553,15 @@ class SparkSession private[sql] ( private[sql] def execute(command: proto.Command): Seq[ExecutePlanResponse] = { val plan = proto.Plan.newBuilder().setCommand(command).build() - client.execute(plan).asScala.toSeq + val seq = client.execute(plan).asScala.toSeq + // sequence is a lazy stream, force materialize it to make sure it is consumed. + seq.foreach(_ => ()) + seq } private[sql] def registerUdf(udf: proto.CommonInlineUserDefinedFunction): Unit = { val command = proto.Command.newBuilder().setRegisterFunction(udf).build() - val plan = proto.Plan.newBuilder().setCommand(command).build() - - client.execute(plan).asScala.foreach(_ => ()) + execute(command) } @DeveloperApi diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala index aac5e6b9cc3..3d20be88888 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala @@ -72,6 +72,12 @@ private[sql] class SparkConnectClient( bstub.analyzePlan(request) } + /** + * Execute the plan and return response iterator. + * + * It returns an open iterator. The caller needs to ensure that this iterator is fully consumed, + * otherwise resources held by a re-attachable query may be left dangling until server timeout. + */ def execute(plan: proto.Plan): java.util.Iterator[proto.ExecutePlanResponse] = { artifactManager.uploadAllClassFileArtifacts() val request = proto.ExecutePlanRequest diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala index ec447cac869..3436037809d 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala @@ -16,6 +16,7 @@ */ package org.apache.spark.sql.connect.client +import java.util.UUID import java.util.concurrent.TimeUnit import scala.collection.JavaConverters._ @@ -292,12 +293,30 @@ class DummySparkConnectService() extends SparkConnectServiceGrpc.SparkConnectSer responseObserver: StreamObserver[ExecutePlanResponse]): Unit = { // Reply with a dummy response using the same client ID val requestSessionId = request.getSessionId + val operationId = if (request.hasOperationId) { + request.getOperationId + } else { + UUID.randomUUID().toString + } inputPlan = request.getPlan val response = ExecutePlanResponse .newBuilder() .setSessionId(requestSessionId) + .setOperationId(operationId) .build() responseObserver.onNext(response) + // Reattachable execute must end with ResultComplete + if (request.getRequestOptionsList.asScala.exists { option => + option.hasReattachOptions && option.getReattachOptions.getReattachable == true + }) { + val resultComplete = ExecutePlanResponse + .newBuilder() + .setSessionId(requestSessionId) + .setOperationId(operationId) + .setResultComplete(proto.ExecutePlanResponse.ResultComplete.newBuilder().build()) + .build() + responseObserver.onNext(resultComplete) + } responseObserver.onCompleted() } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org