nihalpot commented on code in PR #41791:
URL: https://github.com/apache/spark/pull/41791#discussion_r1256193973
##########
connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceProviderSuite.scala:
##########
@@ -18,30 +18,124 @@
package org.apache.spark.sql.kafka010
import java.util.Locale
+import java.util.concurrent.ExecutionException
import scala.collection.JavaConverters._
import org.mockito.Mockito.{mock, when}
-import org.apache.spark.{SparkConf, SparkEnv, SparkFunSuite}
+import org.apache.spark.{SparkConf, SparkEnv, SparkException, SparkFunSuite,
TestUtils}
+import org.apache.spark.sql.Row
import org.apache.spark.sql.connector.read.Scan
+import org.apache.spark.sql.functions.col
+import org.apache.spark.sql.streaming.StreamingQueryException
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
-class KafkaSourceProviderSuite extends SparkFunSuite {
-
+class KafkaSourceProviderSuite extends SparkFunSuite with SharedSparkSession {
private val expected = "1111"
+ protected var testUtils: KafkaTestUtils = _
+
override protected def afterEach(): Unit = {
- SparkEnv.set(null)
super.afterEach()
}
+ override protected def beforeAll(): Unit = {
+ super.beforeAll()
+ testUtils = new KafkaTestUtils
+ testUtils.setup()
+ }
+
+ override protected def afterAll(): Unit = {
+ try {
+ if (testUtils != null) {
+ testUtils.teardown()
+ testUtils = null
+ }
+ } finally {
+ super.afterAll()
+ }
+ }
+
test("batch mode - options should be handled as case-insensitive") {
verifyFieldsInBatch(KafkaSourceProvider.CONSUMER_POLL_TIMEOUT, expected,
batch => {
assert(expected.toLong === batch.pollTimeoutMs)
})
}
+ /*
+ the goal of this test is to verify the functionality of the aws msk IAM
auth
+ how this test works:
+ if testType contains source/sink, kafka is used as a source/sink option
respectively
+ if testType contains stream/batch, it is used either in readStream/read or
writeStream/write
+
+ In each case, we test that the library paths are discoverable since
+ if the library was not to be found, another error message would be thrown.
+ Although this broker exists, it does not have IAM capabilities and thus
+ it is expected that a timeout error will be thrown.
+ */
+ Seq("source and stream", "sink and stream",
+ "source and batch", "sink and batch").foreach { testType =>
+ test(s"test MSK IAM auth on kafka '$testType' side") {
+ val options: Map[String, String] = Map(
+ "kafka.bootstrap.servers" -> testUtils.brokerAddress,
+ "subscribe" -> "msk-123",
+ "startingOffsets" -> "earliest",
+ "kafka.sasl.mechanism" -> "AWS_MSK_IAM",
+ "kafka.sasl.jaas.config" ->
+ "software.amazon.msk.auth.iam.IAMLoginModule required;",
+ "kafka.security.protocol" -> "SASL_SSL",
+ "kafka.sasl.client.callback.handler.class" ->
+ "software.amazon.msk.auth.iam.IAMClientCallbackHandler",
+ "retries" -> "0"
+ )
+
+ testUtils.createTopic(options("subscribe"))
+
+ var e: Throwable = null
+ if (testType.contains("stream")) {
+ if (testType.contains("source")) {
+ e = intercept[StreamingQueryException] {
+ spark.readStream.format("kafka").options(options).load()
+ .writeStream.format("console").start().processAllAvailable()
+ }
+ TestUtils.assertExceptionMsg(e, "Timed out waiting for a node
assignment")
+ } else {
+ e = intercept[StreamingQueryException] {
+ spark.readStream.format("rate").option("rowsPerSecond", 10).load()
+ .withColumn("value", col("value").cast(StringType)).writeStream
+ .format("kafka").options(options).option("checkpointLocation",
"temp/testing")
+ .option("topic",
options("subscribe")).start().processAllAvailable()
+ }
+ TestUtils.assertExceptionMsg(e, s"TimeoutException: Topic
${options("subscribe")} " +
Review Comment:
^ Read above for clarification, this is the same essentially the same error,
though from the producer and even though the topic exists, it doesn't find it
in the metadata since the node doesn't offer that mode of authentication
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]