nihalpot commented on code in PR #41791:
URL: https://github.com/apache/spark/pull/41791#discussion_r1251322171


##########
connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceProviderSuite.scala:
##########
@@ -18,30 +18,108 @@
 package org.apache.spark.sql.kafka010
 
 import java.util.Locale
-
-import scala.collection.JavaConverters._
+import java.util.concurrent.ExecutionException
 
 import org.mockito.Mockito.{mock, when}
 
-import org.apache.spark.{SparkConf, SparkEnv, SparkFunSuite}
+import org.apache.spark.{SparkConf, SparkEnv, SparkException, SparkFunSuite, 
TestUtils}
+import org.apache.spark.sql.Row
 import org.apache.spark.sql.connector.read.Scan
+import org.apache.spark.sql.functions.col
+import org.apache.spark.sql.streaming.StreamingQueryException
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.{StringType, StructType}
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
-class KafkaSourceProviderSuite extends SparkFunSuite {
+class KafkaSourceProviderSuite extends SparkFunSuite with SharedSparkSession {
+  import scala.collection.JavaConverters._
 
   private val expected = "1111"
 
+  protected var testUtils: KafkaTestUtils = _
+
   override protected def afterEach(): Unit = {
-    SparkEnv.set(null)
     super.afterEach()
   }
 
+  override protected def beforeAll(): Unit = {
+    super.beforeAll()
+    testUtils = new KafkaTestUtils
+    testUtils.setup()
+  }
+
+  override protected def afterAll(): Unit = {
+    try {
+      if (testUtils != null) {
+        testUtils.teardown()
+        testUtils = null
+      }
+    } finally {
+      super.afterAll()
+    }
+  }
+
   test("batch mode - options should be handled as case-insensitive") {
     verifyFieldsInBatch(KafkaSourceProvider.CONSUMER_POLL_TIMEOUT, expected, 
batch => {
       assert(expected.toLong === batch.pollTimeoutMs)
     })
   }
 
+  /* if testType contains source/sink, kafka is used as a source/sink option 
respectively
+  if testType contains stream/batch, it is used either in readStream/read or 
writeStream/write */
+  Seq("source and stream", "sink and stream",
+    "source and batch", "sink and batch").foreach { testType =>
+    test(s"test MSK IAM auth on kafka '$testType' side") {
+      val options: Map[String, String] = Map(
+        "kafka.bootstrap.servers" -> testUtils.brokerAddress,
+        "subscribe" -> "msk-123",
+        "startingOffsets" -> "earliest",
+        "kafka.sasl.mechanism" -> "AWS_MSK_IAM",
+        "kafka.sasl.jaas.config" ->
+          "software.amazon.msk.auth.iam.IAMLoginModule required;",
+        "kafka.security.protocol" -> "SASL_SSL",
+        "kafka.sasl.client.callback.handler.class" ->
+          "software.amazon.msk.auth.iam.IAMClientCallbackHandler"
+      )
+
+      var e: Throwable = null
+      if (testType.contains("stream")) {
+        if (testType.contains("source")) {
+          e = intercept[StreamingQueryException] {
+            spark.readStream.format("kafka").options(options).load()
+              .writeStream.format("console").start().processAllAvailable()
+          }
+          TestUtils.assertExceptionMsg(e, "Timed out")

Review Comment:
   Added a comment in the test description. Although the broker does exist but 
it doesn't have IAM capabilities so the connection times out and throws an 
error. We are testing in this case that these specific classes in the IAM path 
are discoverable, which is how users interact with it (by specifying the class 
paths). Given the limitations of local testing for AWS IAM, I think we can at 
most check for the presence of time out errors.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to