vicennial commented on code in PR #39361:
URL: https://github.com/apache/spark/pull/39361#discussion_r1062978148
##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala:
##########
@@ -17,31 +17,91 @@
package org.apache.spark.sql.connect.client
+import scala.language.existentials
+
+import io.grpc.{ManagedChannel, ManagedChannelBuilder}
+
import org.apache.spark.connect.proto
+import org.apache.spark.sql.connect.common.config.ConnectCommon
+
+/**
+ * Conceptually the remote spark session that communicates with the server.
+ */
+class SparkConnectClient(
+ private val userContext: proto.UserContext,
+ private val channel: ManagedChannel) {
-class SparkConnectClient(private val userContext: proto.UserContext) {
+ private[this] val stub =
proto.SparkConnectServiceGrpc.newBlockingStub(channel)
/**
* Placeholder method.
* @return
* User ID.
*/
def userId: String = userContext.getUserId()
+
+ /**
+ * Dispatch the [[proto.AnalyzePlanRequest]] to the Spark Connect server.
+ * @return
+ * A [[proto.AnalyzePlanResponse]] from the Spark Connect server.
+ */
+ def analyze(request: proto.AnalyzePlanRequest): proto.AnalyzePlanResponse =
+ stub.analyzePlan(request)
+
+ /**
+ * Shutdown the client's connection to the server.
+ */
+ def shutdown(): Unit = {
+ channel.shutdownNow()
+ }
}
object SparkConnectClient {
def builder(): Builder = new Builder()
+ /**
+ * This is a helper class that is used to create a GRPC channel based on
either a set host and
+ * port or a NameResolver-compliant URI connection string.
+ */
class Builder() {
private val userContextBuilder = proto.UserContext.newBuilder()
+ private var _host: String = "localhost"
+ private var _port: Int = ConnectCommon.CONNECT_GRPC_BINDING_PORT
+ private var _connectionString: Option[String] = None
def userId(id: String): Builder = {
userContextBuilder.setUserId(id)
this
}
+ def host(host: String): Builder = {
+ require(host != null)
+ _host = host
+ this
+ }
+
+ def port(port: Int): Builder = {
+ _port = port
+ this
+ }
+
+ /**
+ * Creates the channel with a target connection string, which can be
either a valid
+ * NameResolver-compliant URI, or an authority string. Note: The
connection string, if used,
+ * will override any host/port settings.
+ */
+ def connectionString(connectionString: String): Builder = {
+ _connectionString = Some(connectionString)
+ this
+ }
Review Comment:
That's right, I've updated it now to follow the Spark Connect spec (modulo
SSL/Auth Token support which will be done in a follow-up as part of
https://issues.apache.org/jira/browse/SPARK-41917)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]