grundprinzip commented on code in PR #46182:
URL: https://github.com/apache/spark/pull/46182#discussion_r1583029459
##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala:
##########
@@ -344,35 +357,79 @@ object SparkConnectService extends Logging {
private def startGRPCService(): Unit = {
val debugMode =
SparkEnv.get.conf.getBoolean("spark.connect.grpc.debug.enabled", true)
val bindAddress = SparkEnv.get.conf.get(CONNECT_GRPC_BINDING_ADDRESS)
- val port = SparkEnv.get.conf.get(CONNECT_GRPC_BINDING_PORT)
- val sb = bindAddress match {
- case Some(hostname) =>
- logInfo(log"start GRPC service at: ${MDC(HOST, hostname)}")
- NettyServerBuilder.forAddress(new InetSocketAddress(hostname, port))
- case _ => NettyServerBuilder.forPort(port)
+ val startPort = SparkEnv.get.conf.get(CONNECT_GRPC_BINDING_PORT)
+ val sparkConnectService = new SparkConnectService(debugMode)
+ val protoReflectionService = if (debugMode)
Some(ProtoReflectionService.newInstance()) else None
+ val configuredInterceptors =
SparkConnectInterceptorRegistry.createConfiguredInterceptors()
+
+ val startService = (port: Int) => {
+ val sb = bindAddress match {
+ case Some(hostname) =>
+ logInfo(log"start GRPC service at: ${MDC(HOST, hostname)}")
+ NettyServerBuilder.forAddress(new InetSocketAddress(hostname, port))
+ case _ => NettyServerBuilder.forPort(port)
+ }
+
sb.maxInboundMessageSize(SparkEnv.get.conf.get(CONNECT_GRPC_MAX_INBOUND_MESSAGE_SIZE).toInt)
+ .addService(sparkConnectService)
+
+ // Add all registered interceptors to the server builder.
+ SparkConnectInterceptorRegistry.chainInterceptors(sb,
configuredInterceptors)
+
+ // If debug mode is configured, load the ProtoReflection service so that
tools like
+ // grpcurl can introspect the API for debugging.
+ protoReflectionService.foreach(service => sb.addService(service))
+
+ server = sb.build
+ server.start()
+
+ // It will throw an IllegalStateException if you want to access the
binding address
+ // while the server is in a terminated state, so record the actual
binding address
+ // immediately after the server starts.
+ // There should only be one address, get the actual binding address
+ // of the server according the `server.port()`
+ bindingAddress = server.getListenSockets.asScala
+ .find(_.isInstanceOf[InetSocketAddress])
+ .get
+ .asInstanceOf[InetSocketAddress]
+
+ (server, server.getPort)
}
-
sb.maxInboundMessageSize(SparkEnv.get.conf.get(CONNECT_GRPC_MAX_INBOUND_MESSAGE_SIZE).toInt)
- .addService(new SparkConnectService(debugMode))
-
- // Add all registered interceptors to the server builder.
- SparkConnectInterceptorRegistry.chainInterceptors(sb)
- // If debug mode is configured, load the ProtoReflection service so that
tools like
- // grpcurl can introspect the API for debugging.
- if (debugMode) {
- sb.addService(ProtoReflectionService.newInstance())
- }
- server = sb.build
- server.start()
+ val maxRetries: Int = SparkEnv.get.conf.get(CONNECT_GRPC_PORT_MAX_RETRIES)
+ Utils.startServiceOnPort[Server](
+ startPort,
+ startService,
+ maxRetries,
+ getClass.getName
+ )
}
// Starts the service
- def start(sc: SparkContext): Unit = {
+ def start(sc: SparkContext): Unit = synchronized {
Review Comment:
Thanks
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]