rangadi commented on code in PR #41791:
URL: https://github.com/apache/spark/pull/41791#discussion_r1251179371
##########
connector/kafka-0-10-sql/pom.xml:
##########
@@ -174,6 +174,12 @@
<scope>test</scope>
</dependency>
+ <dependency>
Review Comment:
Add a short comment about what this is for.
##########
connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceProviderSuite.scala:
##########
@@ -18,30 +18,108 @@
package org.apache.spark.sql.kafka010
import java.util.Locale
-
-import scala.collection.JavaConverters._
+import java.util.concurrent.ExecutionException
import org.mockito.Mockito.{mock, when}
-import org.apache.spark.{SparkConf, SparkEnv, SparkFunSuite}
+import org.apache.spark.{SparkConf, SparkEnv, SparkException, SparkFunSuite,
TestUtils}
+import org.apache.spark.sql.Row
import org.apache.spark.sql.connector.read.Scan
+import org.apache.spark.sql.functions.col
+import org.apache.spark.sql.streaming.StreamingQueryException
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
-class KafkaSourceProviderSuite extends SparkFunSuite {
+class KafkaSourceProviderSuite extends SparkFunSuite with SharedSparkSession {
+ import scala.collection.JavaConverters._
private val expected = "1111"
+ protected var testUtils: KafkaTestUtils = _
+
override protected def afterEach(): Unit = {
- SparkEnv.set(null)
super.afterEach()
}
+ override protected def beforeAll(): Unit = {
+ super.beforeAll()
+ testUtils = new KafkaTestUtils
+ testUtils.setup()
+ }
+
+ override protected def afterAll(): Unit = {
+ try {
+ if (testUtils != null) {
+ testUtils.teardown()
+ testUtils = null
+ }
+ } finally {
+ super.afterAll()
+ }
+ }
+
test("batch mode - options should be handled as case-insensitive") {
verifyFieldsInBatch(KafkaSourceProvider.CONSUMER_POLL_TIMEOUT, expected,
batch => {
assert(expected.toLong === batch.pollTimeoutMs)
})
}
+ /* if testType contains source/sink, kafka is used as a source/sink option
respectively
Review Comment:
What is this about?
Could you add a brief description of the test here?
##########
docs/structured-streaming-kafka-integration.md:
##########
@@ -1171,3 +1171,14 @@ This can be done several ways. One possibility is to
provide additional JVM para
--driver-java-options
"-Djava.security.auth.login.config=/path/to/custom_jaas.conf" \
--conf
spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/path/to/custom_jaas.conf
\
...
+
+### IAM authentication
+
+Managed Streaming for Kafka (MSK) is a fully managed service offered by Amazon
Web Services (AWS) that simplifies the deployment, management, and scaling of
Apache Kafka clusters.
+One way to connect to MSK is by using IAM authentication. To facilitate this,
Spark already ships with the [aws msk
library](https://github.com/aws/aws-msk-iam-auth) by default so no additional
dependencies are required.
+To use IAM authentication, you can set the kafka config as in the [library
documentation](https://github.com/aws/aws-msk-iam-auth). For example:
+
+ --conf spark.kafka.clusters.${cluster}.kafka.security.protocol=SASL_SSL
Review Comment:
Where are these confis read?
Could you use a code snippet like `spark.readStream.format("kafka"). ...` to
set these?
##########
connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceProviderSuite.scala:
##########
@@ -18,30 +18,108 @@
package org.apache.spark.sql.kafka010
import java.util.Locale
-
-import scala.collection.JavaConverters._
+import java.util.concurrent.ExecutionException
import org.mockito.Mockito.{mock, when}
-import org.apache.spark.{SparkConf, SparkEnv, SparkFunSuite}
+import org.apache.spark.{SparkConf, SparkEnv, SparkException, SparkFunSuite,
TestUtils}
+import org.apache.spark.sql.Row
import org.apache.spark.sql.connector.read.Scan
+import org.apache.spark.sql.functions.col
+import org.apache.spark.sql.streaming.StreamingQueryException
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
-class KafkaSourceProviderSuite extends SparkFunSuite {
+class KafkaSourceProviderSuite extends SparkFunSuite with SharedSparkSession {
+ import scala.collection.JavaConverters._
Review Comment:
Why is this moved inside?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]