grundprinzip commented on code in PR #41880:
URL: https://github.com/apache/spark/pull/41880#discussion_r1255578300
##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala:
##########
@@ -509,12 +518,20 @@ object SparkConnectClient {
def createChannel(): ManagedChannel = {
val channelBuilder = Grpc.newChannelBuilderForAddress(host, port,
credentials)
+
if (metadata.nonEmpty) {
channelBuilder.intercept(new MetadataHeaderClientInterceptor(metadata))
}
+
+ for (interceptor <- interceptors) {
+ channelBuilder.intercept(interceptor)
+ }
Review Comment:
```suggestion
interceptors.forEach(channelBuilder.intercept(_))
```
##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/SparkSessionSuite.scala:
##########
@@ -16,12 +16,45 @@
*/
package org.apache.spark.sql
+import java.util.concurrent.TimeUnit
+
+import io.grpc.{CallOptions, Channel, ClientCall, ClientInterceptor,
MethodDescriptor, Server}
+import io.grpc.netty.NettyServerBuilder
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.spark.sql.connect.client.DummySparkConnectService
import org.apache.spark.sql.connect.client.util.ConnectFunSuite
/**
* Tests for non-dataframe related SparkSession operations.
*/
-class SparkSessionSuite extends ConnectFunSuite {
+class SparkSessionSuite extends ConnectFunSuite with BeforeAndAfterEach {
+ private var service: DummySparkConnectService = _
+ private var server: Server = _
+
+ private def startDummyServer(port: Int): Unit = {
+ service = new DummySparkConnectService
+ server = NettyServerBuilder
+ .forPort(port)
+ .addService(service)
+ .build()
+ server.start()
+ }
Review Comment:
I think we have something called e2e client test for this that does properly
start the service.
##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala:
##########
@@ -656,6 +654,11 @@ object SparkSession extends Logging {
this
}
+ def interceptor(interceptor: ClientInterceptor): Builder = {
Review Comment:
This is the public facing part of the interface and needs the most extensive
documentation,.
##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala:
##########
@@ -656,6 +654,11 @@ object SparkSession extends Logging {
this
}
+ def interceptor(interceptor: ClientInterceptor): Builder = {
Review Comment:
doc please :)
##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala:
##########
@@ -463,7 +460,18 @@ object SparkConnectClient {
this
}
- def build(): SparkConnectClient = new SparkConnectClient(_configuration)
+ /**
+ * Add an interceptor to be used during channel creation.
+ *
+ * Note that interceptors added last are executed first by grpc.
Review Comment:
```suggestion
* Note that interceptors added last are executed first by gRPC.
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]