rangadi commented on code in PR #41791:
URL: https://github.com/apache/spark/pull/41791#discussion_r1251179371


##########
connector/kafka-0-10-sql/pom.xml:
##########
@@ -174,6 +174,12 @@
       <scope>test</scope>
     </dependency>
 
+    <dependency>

Review Comment:
   Add a short comment about what this is for. 



##########
connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceProviderSuite.scala:
##########
@@ -18,30 +18,108 @@
 package org.apache.spark.sql.kafka010
 
 import java.util.Locale
-
-import scala.collection.JavaConverters._
+import java.util.concurrent.ExecutionException
 
 import org.mockito.Mockito.{mock, when}
 
-import org.apache.spark.{SparkConf, SparkEnv, SparkFunSuite}
+import org.apache.spark.{SparkConf, SparkEnv, SparkException, SparkFunSuite, 
TestUtils}
+import org.apache.spark.sql.Row
 import org.apache.spark.sql.connector.read.Scan
+import org.apache.spark.sql.functions.col
+import org.apache.spark.sql.streaming.StreamingQueryException
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.{StringType, StructType}
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
-class KafkaSourceProviderSuite extends SparkFunSuite {
+class KafkaSourceProviderSuite extends SparkFunSuite with SharedSparkSession {
+  import scala.collection.JavaConverters._
 
   private val expected = "1111"
 
+  protected var testUtils: KafkaTestUtils = _
+
   override protected def afterEach(): Unit = {
-    SparkEnv.set(null)
     super.afterEach()
   }
 
+  override protected def beforeAll(): Unit = {
+    super.beforeAll()
+    testUtils = new KafkaTestUtils
+    testUtils.setup()
+  }
+
+  override protected def afterAll(): Unit = {
+    try {
+      if (testUtils != null) {
+        testUtils.teardown()
+        testUtils = null
+      }
+    } finally {
+      super.afterAll()
+    }
+  }
+
   test("batch mode - options should be handled as case-insensitive") {
     verifyFieldsInBatch(KafkaSourceProvider.CONSUMER_POLL_TIMEOUT, expected, 
batch => {
       assert(expected.toLong === batch.pollTimeoutMs)
     })
   }
 
+  /* if testType contains source/sink, kafka is used as a source/sink option 
respectively

Review Comment:
   What is this about? 
   
   Could you add a brief description of the test here? 



##########
docs/structured-streaming-kafka-integration.md:
##########
@@ -1171,3 +1171,14 @@ This can be done several ways. One possibility is to 
provide additional JVM para
         --driver-java-options 
"-Djava.security.auth.login.config=/path/to/custom_jaas.conf" \
         --conf 
spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/path/to/custom_jaas.conf
 \
         ...
+
+### IAM authentication
+
+Managed Streaming for Kafka (MSK) is a fully managed service offered by Amazon 
Web Services (AWS) that simplifies the deployment, management, and scaling of 
Apache Kafka clusters. 
+One way to connect to MSK is by using IAM authentication. To facilitate this, 
Spark already ships with the [aws msk 
library](https://github.com/aws/aws-msk-iam-auth) by default so no additional 
dependencies are required.
+To use IAM authentication, you can set the kafka config as in the [library 
documentation](https://github.com/aws/aws-msk-iam-auth). For example:
+
+    --conf spark.kafka.clusters.${cluster}.kafka.security.protocol=SASL_SSL

Review Comment:
   Where are these confis read? 
   Could you use a code snippet like `spark.readStream.format("kafka"). ...` to 
set these? 
   



##########
connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceProviderSuite.scala:
##########
@@ -18,30 +18,108 @@
 package org.apache.spark.sql.kafka010
 
 import java.util.Locale
-
-import scala.collection.JavaConverters._
+import java.util.concurrent.ExecutionException
 
 import org.mockito.Mockito.{mock, when}
 
-import org.apache.spark.{SparkConf, SparkEnv, SparkFunSuite}
+import org.apache.spark.{SparkConf, SparkEnv, SparkException, SparkFunSuite, 
TestUtils}
+import org.apache.spark.sql.Row
 import org.apache.spark.sql.connector.read.Scan
+import org.apache.spark.sql.functions.col
+import org.apache.spark.sql.streaming.StreamingQueryException
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.{StringType, StructType}
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
-class KafkaSourceProviderSuite extends SparkFunSuite {
+class KafkaSourceProviderSuite extends SparkFunSuite with SharedSparkSession {
+  import scala.collection.JavaConverters._

Review Comment:
   Why is this moved inside? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to