TakawaAkirayo commented on code in PR #46182:
URL: https://github.com/apache/spark/pull/46182#discussion_r1584064102
##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala:
##########
@@ -344,35 +357,79 @@ object SparkConnectService extends Logging {
private def startGRPCService(): Unit = {
val debugMode =
SparkEnv.get.conf.getBoolean("spark.connect.grpc.debug.enabled", true)
val bindAddress = SparkEnv.get.conf.get(CONNECT_GRPC_BINDING_ADDRESS)
- val port = SparkEnv.get.conf.get(CONNECT_GRPC_BINDING_PORT)
- val sb = bindAddress match {
- case Some(hostname) =>
- logInfo(log"start GRPC service at: ${MDC(HOST, hostname)}")
- NettyServerBuilder.forAddress(new InetSocketAddress(hostname, port))
- case _ => NettyServerBuilder.forPort(port)
+ val startPort = SparkEnv.get.conf.get(CONNECT_GRPC_BINDING_PORT)
+ val sparkConnectService = new SparkConnectService(debugMode)
+ val protoReflectionService = if (debugMode)
Some(ProtoReflectionService.newInstance()) else None
+ val configuredInterceptors =
SparkConnectInterceptorRegistry.createConfiguredInterceptors()
+
+ val startService = (port: Int) => {
+ val sb = bindAddress match {
+ case Some(hostname) =>
+ logInfo(log"start GRPC service at: ${MDC(HOST, hostname)}")
+ NettyServerBuilder.forAddress(new InetSocketAddress(hostname, port))
+ case _ => NettyServerBuilder.forPort(port)
+ }
+
sb.maxInboundMessageSize(SparkEnv.get.conf.get(CONNECT_GRPC_MAX_INBOUND_MESSAGE_SIZE).toInt)
+ .addService(sparkConnectService)
+
+ // Add all registered interceptors to the server builder.
+ SparkConnectInterceptorRegistry.chainInterceptors(sb,
configuredInterceptors)
Review Comment:
> Why this change? I'm not sure I understand why the interceptor registry
needs to be modified.
@grundprinzip Before this change, the
SparkConnectInterceptorRegistry.chainInterceptors() would internally invoke
createConfiguredInterceptors(). Therefore, if
SparkConnectInterceptorRegistry.chainInterceptors() was placed within the code
block (startServiceFn), it could be retried multiple times, resulting in the
creation of configured interceptors multiple times.
After this change, the createConfiguredInterceptors() method will only be
invoked once if there are retries.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]