TakawaAkirayo commented on code in PR #46182:
URL: https://github.com/apache/spark/pull/46182#discussion_r1584064102


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala:
##########
@@ -344,35 +357,79 @@ object SparkConnectService extends Logging {
   private def startGRPCService(): Unit = {
     val debugMode = 
SparkEnv.get.conf.getBoolean("spark.connect.grpc.debug.enabled", true)
     val bindAddress = SparkEnv.get.conf.get(CONNECT_GRPC_BINDING_ADDRESS)
-    val port = SparkEnv.get.conf.get(CONNECT_GRPC_BINDING_PORT)
-    val sb = bindAddress match {
-      case Some(hostname) =>
-        logInfo(log"start GRPC service at: ${MDC(HOST, hostname)}")
-        NettyServerBuilder.forAddress(new InetSocketAddress(hostname, port))
-      case _ => NettyServerBuilder.forPort(port)
+    val startPort = SparkEnv.get.conf.get(CONNECT_GRPC_BINDING_PORT)
+    val sparkConnectService = new SparkConnectService(debugMode)
+    val protoReflectionService = if (debugMode) 
Some(ProtoReflectionService.newInstance()) else None
+    val configuredInterceptors = 
SparkConnectInterceptorRegistry.createConfiguredInterceptors()
+
+    val startService = (port: Int) => {
+      val sb = bindAddress match {
+        case Some(hostname) =>
+          logInfo(log"start GRPC service at: ${MDC(HOST, hostname)}")
+          NettyServerBuilder.forAddress(new InetSocketAddress(hostname, port))
+        case _ => NettyServerBuilder.forPort(port)
+      }
+      
sb.maxInboundMessageSize(SparkEnv.get.conf.get(CONNECT_GRPC_MAX_INBOUND_MESSAGE_SIZE).toInt)
+        .addService(sparkConnectService)
+
+      // Add all registered interceptors to the server builder.
+      SparkConnectInterceptorRegistry.chainInterceptors(sb, 
configuredInterceptors)

Review Comment:
   > Why this change? I'm not sure I understand why the interceptor registry 
needs to be modified.
   
   @grundprinzip Before this change, the 
SparkConnectInterceptorRegistry.chainInterceptors() would internally invoke 
createConfiguredInterceptors(). Therefore, if 
SparkConnectInterceptorRegistry.chainInterceptors() was placed within the code 
block (startServiceFn), it could be retried multiple times, resulting in the 
creation of configured interceptors multiple times.
   
   After this change, the createConfiguredInterceptors() method will only be 
invoked once if there are retries.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to