WweiL commented on code in PR #41791:
URL: https://github.com/apache/spark/pull/41791#discussion_r1256384588


##########
connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceProviderSuite.scala:
##########
@@ -18,30 +18,127 @@
 package org.apache.spark.sql.kafka010
 
 import java.util.Locale
+import java.util.concurrent.ExecutionException
 
 import scala.collection.JavaConverters._
 
 import org.mockito.Mockito.{mock, when}
 
-import org.apache.spark.{SparkConf, SparkEnv, SparkFunSuite}
+import org.apache.spark.{SparkConf, SparkEnv, SparkException, SparkFunSuite, 
TestUtils}
+import org.apache.spark.sql.Row
 import org.apache.spark.sql.connector.read.Scan
+import org.apache.spark.sql.functions.col
+import org.apache.spark.sql.streaming.StreamingQueryException
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.{StringType, StructType}
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
-class KafkaSourceProviderSuite extends SparkFunSuite {
-
+class KafkaSourceProviderSuite extends SparkFunSuite with SharedSparkSession {
   private val expected = "1111"
 
+  protected var testUtils: KafkaTestUtils = _
+
   override protected def afterEach(): Unit = {
-    SparkEnv.set(null)
     super.afterEach()
   }
 
+  override protected def beforeAll(): Unit = {
+    super.beforeAll()
+    testUtils = new KafkaTestUtils
+    testUtils.setup()
+  }

Review Comment:
   Is there a reason we need to put this suite in `KafkaSourceProviderSuite` 
and add these `beforeAll` and `afterAll` function? Is it possible to put it in 
`KafkaMicroBatchSourceSuite`?



##########
connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceProviderSuite.scala:
##########
@@ -18,30 +18,127 @@
 package org.apache.spark.sql.kafka010
 
 import java.util.Locale
+import java.util.concurrent.ExecutionException
 
 import scala.collection.JavaConverters._
 
 import org.mockito.Mockito.{mock, when}
 
-import org.apache.spark.{SparkConf, SparkEnv, SparkFunSuite}
+import org.apache.spark.{SparkConf, SparkEnv, SparkException, SparkFunSuite, 
TestUtils}
+import org.apache.spark.sql.Row
 import org.apache.spark.sql.connector.read.Scan
+import org.apache.spark.sql.functions.col
+import org.apache.spark.sql.streaming.StreamingQueryException
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.{StringType, StructType}
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
-class KafkaSourceProviderSuite extends SparkFunSuite {
-
+class KafkaSourceProviderSuite extends SparkFunSuite with SharedSparkSession {
   private val expected = "1111"
 
+  protected var testUtils: KafkaTestUtils = _
+
   override protected def afterEach(): Unit = {
-    SparkEnv.set(null)
     super.afterEach()
   }
 
+  override protected def beforeAll(): Unit = {
+    super.beforeAll()
+    testUtils = new KafkaTestUtils
+    testUtils.setup()
+  }
+
+  override protected def afterAll(): Unit = {
+    try {
+      if (testUtils != null) {
+        testUtils.teardown()
+        testUtils = null
+      }
+    } finally {
+      super.afterAll()
+    }
+  }
+
   test("batch mode - options should be handled as case-insensitive") {
     verifyFieldsInBatch(KafkaSourceProvider.CONSUMER_POLL_TIMEOUT, expected, 
batch => {
       assert(expected.toLong === batch.pollTimeoutMs)
     })
   }
 
+  /*
+    the goal of this test is to verify the functionality of the aws msk IAM 
auth
+    how this test works:
+    if testType contains source/sink, kafka is used as a source/sink option 
respectively
+    if testType contains stream/batch, it is used either in readStream/read or 
writeStream/write
+
+    In each case, we test that the library paths are discoverable since
+    if the library was not to be found, another error message would be thrown.
+    Although this broker exists, it does not have IAM capabilities and thus
+    it is expected that a timeout error will be thrown.
+  */
+  Seq("source and stream", "sink and stream",
+    "source and batch", "sink and batch").foreach { testType =>
+    test(s"test MSK IAM auth on kafka '$testType' side") {
+      val options: Map[String, String] = Map(
+        "kafka.bootstrap.servers" -> testUtils.brokerAddress,
+        "subscribe" -> "msk-123",
+        "startingOffsets" -> "earliest",
+        "kafka.sasl.mechanism" -> "AWS_MSK_IAM",
+        "kafka.sasl.jaas.config" ->
+          "software.amazon.msk.auth.iam.IAMLoginModule required;",
+        "kafka.security.protocol" -> "SASL_SSL",
+        "kafka.sasl.client.callback.handler.class" ->
+          "software.amazon.msk.auth.iam.IAMClientCallbackHandler",
+        "retries" -> "0",
+        "kafka.request.timeout.ms" -> "3000",
+        "kafka.default.api.timeout.ms" -> "3000",
+        "kafka.max.block.ms" -> "3000"
+      )
+
+      testUtils.createTopic(options("subscribe"))

Review Comment:
   nit: this seems to be a better way of doing this: 
https://github.com/apache/spark/blob/fcc846e82bfb5cea61361b973ec382a5bd07c87a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala#L206C22-L216



##########
connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceProviderSuite.scala:
##########
@@ -18,30 +18,124 @@
 package org.apache.spark.sql.kafka010
 
 import java.util.Locale
+import java.util.concurrent.ExecutionException
 
 import scala.collection.JavaConverters._
 
 import org.mockito.Mockito.{mock, when}
 
-import org.apache.spark.{SparkConf, SparkEnv, SparkFunSuite}
+import org.apache.spark.{SparkConf, SparkEnv, SparkException, SparkFunSuite, 
TestUtils}
+import org.apache.spark.sql.Row
 import org.apache.spark.sql.connector.read.Scan
+import org.apache.spark.sql.functions.col
+import org.apache.spark.sql.streaming.StreamingQueryException
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.{StringType, StructType}
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
-class KafkaSourceProviderSuite extends SparkFunSuite {
-
+class KafkaSourceProviderSuite extends SparkFunSuite with SharedSparkSession {
   private val expected = "1111"
 
+  protected var testUtils: KafkaTestUtils = _
+
   override protected def afterEach(): Unit = {
-    SparkEnv.set(null)
     super.afterEach()
   }
 
+  override protected def beforeAll(): Unit = {
+    super.beforeAll()
+    testUtils = new KafkaTestUtils
+    testUtils.setup()
+  }
+
+  override protected def afterAll(): Unit = {
+    try {
+      if (testUtils != null) {
+        testUtils.teardown()
+        testUtils = null
+      }
+    } finally {
+      super.afterAll()
+    }
+  }
+
   test("batch mode - options should be handled as case-insensitive") {
     verifyFieldsInBatch(KafkaSourceProvider.CONSUMER_POLL_TIMEOUT, expected, 
batch => {
       assert(expected.toLong === batch.pollTimeoutMs)
     })
   }
 
+  /*
+    the goal of this test is to verify the functionality of the aws msk IAM 
auth
+    how this test works:
+    if testType contains source/sink, kafka is used as a source/sink option 
respectively
+    if testType contains stream/batch, it is used either in readStream/read or 
writeStream/write
+
+    In each case, we test that the library paths are discoverable since
+    if the library was not to be found, another error message would be thrown.
+    Although this broker exists, it does not have IAM capabilities and thus
+    it is expected that a timeout error will be thrown.
+  */
+  Seq("source and stream", "sink and stream",
+    "source and batch", "sink and batch").foreach { testType =>
+    test(s"test MSK IAM auth on kafka '$testType' side") {
+      val options: Map[String, String] = Map(
+        "kafka.bootstrap.servers" -> testUtils.brokerAddress,
+        "subscribe" -> "msk-123",
+        "startingOffsets" -> "earliest",
+        "kafka.sasl.mechanism" -> "AWS_MSK_IAM",
+        "kafka.sasl.jaas.config" ->
+          "software.amazon.msk.auth.iam.IAMLoginModule required;",
+        "kafka.security.protocol" -> "SASL_SSL",
+        "kafka.sasl.client.callback.handler.class" ->
+          "software.amazon.msk.auth.iam.IAMClientCallbackHandler",
+        "retries" -> "0"
+      )
+
+      testUtils.createTopic(options("subscribe"))
+
+      var e: Throwable = null
+      if (testType.contains("stream")) {
+        if (testType.contains("source")) {
+          e = intercept[StreamingQueryException] {
+            spark.readStream.format("kafka").options(options).load()
+              .writeStream.format("console").start().processAllAvailable()
+          }
+          TestUtils.assertExceptionMsg(e, "Timed out waiting for a node 
assignment")

Review Comment:
   I think it's fine at this moment since we just want to verify it works or 
not. In real implementation not sure if we could catch that error somewhere and 
throw a nicer one



##########
connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceProviderSuite.scala:
##########
@@ -18,30 +18,108 @@
 package org.apache.spark.sql.kafka010
 
 import java.util.Locale
-
-import scala.collection.JavaConverters._
+import java.util.concurrent.ExecutionException
 
 import org.mockito.Mockito.{mock, when}
 
-import org.apache.spark.{SparkConf, SparkEnv, SparkFunSuite}
+import org.apache.spark.{SparkConf, SparkEnv, SparkException, SparkFunSuite, 
TestUtils}
+import org.apache.spark.sql.Row
 import org.apache.spark.sql.connector.read.Scan
+import org.apache.spark.sql.functions.col
+import org.apache.spark.sql.streaming.StreamingQueryException
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.{StringType, StructType}
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
-class KafkaSourceProviderSuite extends SparkFunSuite {
+class KafkaSourceProviderSuite extends SparkFunSuite with SharedSparkSession {
+  import scala.collection.JavaConverters._
 
   private val expected = "1111"
 
+  protected var testUtils: KafkaTestUtils = _
+
   override protected def afterEach(): Unit = {
-    SparkEnv.set(null)
     super.afterEach()
   }
 
+  override protected def beforeAll(): Unit = {
+    super.beforeAll()
+    testUtils = new KafkaTestUtils
+    testUtils.setup()
+  }
+
+  override protected def afterAll(): Unit = {
+    try {
+      if (testUtils != null) {
+        testUtils.teardown()
+        testUtils = null
+      }
+    } finally {
+      super.afterAll()
+    }
+  }
+
   test("batch mode - options should be handled as case-insensitive") {
     verifyFieldsInBatch(KafkaSourceProvider.CONSUMER_POLL_TIMEOUT, expected, 
batch => {
       assert(expected.toLong === batch.pollTimeoutMs)
     })
   }
 
+  /* if testType contains source/sink, kafka is used as a source/sink option 
respectively
+  if testType contains stream/batch, it is used either in readStream/read or 
writeStream/write */
+  Seq("source and stream", "sink and stream",
+    "source and batch", "sink and batch").foreach { testType =>
+    test(s"test MSK IAM auth on kafka '$testType' side") {
+      val options: Map[String, String] = Map(
+        "kafka.bootstrap.servers" -> testUtils.brokerAddress,
+        "subscribe" -> "msk-123",
+        "startingOffsets" -> "earliest",
+        "kafka.sasl.mechanism" -> "AWS_MSK_IAM",
+        "kafka.sasl.jaas.config" ->
+          "software.amazon.msk.auth.iam.IAMLoginModule required;",
+        "kafka.security.protocol" -> "SASL_SSL",
+        "kafka.sasl.client.callback.handler.class" ->
+          "software.amazon.msk.auth.iam.IAMClientCallbackHandler"
+      )
+
+      var e: Throwable = null
+      if (testType.contains("stream")) {
+        if (testType.contains("source")) {
+          e = intercept[StreamingQueryException] {
+            spark.readStream.format("kafka").options(options).load()
+              .writeStream.format("console").start().processAllAvailable()
+          }
+          TestUtils.assertExceptionMsg(e, "Timed out")
+        } else {
+          e = intercept[StreamingQueryException] {
+            spark.readStream.format("rate").option("rowsPerSecond", 10).load()
+              .withColumn("value", col("value").cast(StringType)).writeStream
+              .format("kafka").options(options).option("checkpointLocation", 
"temp/testing")
+              .option("topic", "msk-123").start().processAllAvailable()
+          }
+          TestUtils.assertExceptionMsg(e, "Timeout")
+        }
+      } else {
+        if (testType.contains("source")) {

Review Comment:
   I agree with Anish, you could create a helper function like 
`verifyMissingOffsetsDontCauseDuplicatedRecords` in the suite you mentioned: 
https://github.com/apache/spark/blob/fcc846e82bfb5cea61361b973ec382a5bd07c87a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDontFailOnDataLossSuite.scala#L97
   
   and either create 4 test cases like these 
https://github.com/apache/spark/blob/fcc846e82bfb5cea61361b973ec382a5bd07c87a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDontFailOnDataLossSuite.scala#L136-L160
   
   or use the `Seq.foreach` way. Just to align the code style
   



##########
connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceProviderSuite.scala:
##########
@@ -18,30 +18,124 @@
 package org.apache.spark.sql.kafka010
 
 import java.util.Locale
+import java.util.concurrent.ExecutionException
 
 import scala.collection.JavaConverters._
 
 import org.mockito.Mockito.{mock, when}
 
-import org.apache.spark.{SparkConf, SparkEnv, SparkFunSuite}
+import org.apache.spark.{SparkConf, SparkEnv, SparkException, SparkFunSuite, 
TestUtils}
+import org.apache.spark.sql.Row
 import org.apache.spark.sql.connector.read.Scan
+import org.apache.spark.sql.functions.col
+import org.apache.spark.sql.streaming.StreamingQueryException
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.{StringType, StructType}
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
-class KafkaSourceProviderSuite extends SparkFunSuite {
-
+class KafkaSourceProviderSuite extends SparkFunSuite with SharedSparkSession {
   private val expected = "1111"
 
+  protected var testUtils: KafkaTestUtils = _
+
   override protected def afterEach(): Unit = {
-    SparkEnv.set(null)
     super.afterEach()
   }
 
+  override protected def beforeAll(): Unit = {
+    super.beforeAll()
+    testUtils = new KafkaTestUtils
+    testUtils.setup()
+  }
+
+  override protected def afterAll(): Unit = {
+    try {
+      if (testUtils != null) {
+        testUtils.teardown()
+        testUtils = null
+      }
+    } finally {
+      super.afterAll()
+    }
+  }
+
   test("batch mode - options should be handled as case-insensitive") {
     verifyFieldsInBatch(KafkaSourceProvider.CONSUMER_POLL_TIMEOUT, expected, 
batch => {
       assert(expected.toLong === batch.pollTimeoutMs)
     })
   }
 
+  /*
+    the goal of this test is to verify the functionality of the aws msk IAM 
auth
+    how this test works:
+    if testType contains source/sink, kafka is used as a source/sink option 
respectively
+    if testType contains stream/batch, it is used either in readStream/read or 
writeStream/write
+
+    In each case, we test that the library paths are discoverable since
+    if the library was not to be found, another error message would be thrown.
+    Although this broker exists, it does not have IAM capabilities and thus
+    it is expected that a timeout error will be thrown.
+  */
+  Seq("source and stream", "sink and stream",
+    "source and batch", "sink and batch").foreach { testType =>
+    test(s"test MSK IAM auth on kafka '$testType' side") {
+      val options: Map[String, String] = Map(
+        "kafka.bootstrap.servers" -> testUtils.brokerAddress,
+        "subscribe" -> "msk-123",
+        "startingOffsets" -> "earliest",
+        "kafka.sasl.mechanism" -> "AWS_MSK_IAM",
+        "kafka.sasl.jaas.config" ->
+          "software.amazon.msk.auth.iam.IAMLoginModule required;",
+        "kafka.security.protocol" -> "SASL_SSL",
+        "kafka.sasl.client.callback.handler.class" ->
+          "software.amazon.msk.auth.iam.IAMClientCallbackHandler",
+        "retries" -> "0"
+      )
+
+      testUtils.createTopic(options("subscribe"))
+
+      var e: Throwable = null
+      if (testType.contains("stream")) {
+        if (testType.contains("source")) {
+          e = intercept[StreamingQueryException] {
+            spark.readStream.format("kafka").options(options).load()
+              .writeStream.format("console").start().processAllAvailable()
+          }
+          TestUtils.assertExceptionMsg(e, "Timed out waiting for a node 
assignment")

Review Comment:
   > Essentially, the kafka client keeps calling the 'describeTopics' endpoint 
on the broker while using IAM authentication which times out since it doesn't 
have IAM auth enabled.
   
   If we want to keep the timeout error here I think it's good to add this 
"keeps calling and time out" explanation also to the comment above
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to