vicennial commented on code in PR #39361:
URL: https://github.com/apache/spark/pull/39361#discussion_r1063406543


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala:
##########
@@ -17,31 +17,147 @@
 
 package org.apache.spark.sql.connect.client
 
+import scala.language.existentials
+
+import io.grpc.{ManagedChannel, ManagedChannelBuilder}
+import java.net.URI
+
 import org.apache.spark.connect.proto
+import org.apache.spark.sql.connect.common.config.ConnectCommon
+
+/**
+ * Conceptually the remote spark session that communicates with the server.
+ */
+class SparkConnectClient(
+    private val userContext: proto.UserContext,
+    private val channel: ManagedChannel) {
 
-class SparkConnectClient(private val userContext: proto.UserContext) {
+  private[this] val stub = 
proto.SparkConnectServiceGrpc.newBlockingStub(channel)
 
   /**
    * Placeholder method.
    * @return
    *   User ID.
    */
   def userId: String = userContext.getUserId()
+
+  /**
+   * Dispatch the [[proto.AnalyzePlanRequest]] to the Spark Connect server.
+   * @return
+   *   A [[proto.AnalyzePlanResponse]] from the Spark Connect server.
+   */
+  def analyze(request: proto.AnalyzePlanRequest): proto.AnalyzePlanResponse =
+    stub.analyzePlan(request)
+
+  /**
+   * Shutdown the client's connection to the server.
+   */
+  def shutdown(): Unit = {
+    channel.shutdownNow()
+  }
 }
 
 object SparkConnectClient {
   def builder(): Builder = new Builder()
 
+  /**
+   * This is a helper class that is used to create a GRPC channel based on 
either a set host and
+   * port or a NameResolver-compliant URI connection string.
+   */
   class Builder() {
     private val userContextBuilder = proto.UserContext.newBuilder()
+    private var _host: String = "localhost"
+    private var _port: Int = ConnectCommon.CONNECT_GRPC_BINDING_PORT
 
     def userId(id: String): Builder = {
       userContextBuilder.setUserId(id)
       this
     }
 
+    def host(host: String): Builder = {
+      require(host != null)
+      _host = host
+      this
+    }
+
+    def port(port: Int): Builder = {
+      _port = port
+      this
+    }
+
+    object URIParams {
+      val PARAM_USER_ID = "user_id"
+      val PARAM_USE_SSL = "use_ssl"
+      val PARAM_TOKEN = "token"
+    }
+
+    private def verifyURI(uri: URI): Unit = {
+      if (uri.getScheme != "sc") {
+        throw new IllegalArgumentException("Scheme for connection URI must be 
'sc'.")
+      }
+      // Java URI considers everything after the authority segment as "path" 
until the
+      // ? (query)/# (fragment) components as shown in the regex
+      // [scheme:][//authority][path][?query][#fragment].
+      // However, with the Spark Connect definition, configuration parameter 
are passed in the
+      // style of the HTTP URL Path Parameter Syntax (e.g
+      // sc://hostname:port/;param1=value;param2=value).
+      // Thus, we manually parse the "java path" to get the "correct path" and 
configuration
+      // parameters.
+      val pathAndParams = uri.getPath.split(';')
+      if (pathAndParams.nonEmpty && (pathAndParams(0) != "/" && 
pathAndParams(0) != "")) {

Review Comment:
   Yes, if it's empty, then `pathAndParams.nonEmpty` would be false so it won't 
throw. It's tested in this line: 
https://github.com/apache/spark/pull/39361/files#diff-4bb9e2a97d02c47c2c3d9171756101f21f9e24c3b364aba6ec2d0dffc1d8daf1R107



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to