dongjoon-hyun commented on a change in pull request #24967: [SPARK-28163][SS] 
Use CaseInsensitiveMap for KafkaOffsetReader
URL: https://github.com/apache/spark/pull/24967#discussion_r300764407
 
 

 ##########
 File path: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceProviderSuite.scala
 ##########
 @@ -30,52 +30,100 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
 class KafkaSourceProviderSuite extends SparkFunSuite with PrivateMethodTester {
 
+  private val expected = "1111"
   private val pollTimeoutMsMethod = PrivateMethod[Long]('pollTimeoutMs)
   private val maxOffsetsPerTriggerMethod = 
PrivateMethod[Option[Long]]('maxOffsetsPerTrigger)
+  private val offsetReaderMethod = 
PrivateMethod[KafkaOffsetReader]('offsetReader)
+  private val fetchOffsetNumRetriesMethod = 
PrivateMethod[Int]('fetchOffsetNumRetries)
+  private val fetchOffsetRetryIntervalMsMethod = 
PrivateMethod[Long]('fetchOffsetRetryIntervalMs)
 
   override protected def afterEach(): Unit = {
     SparkEnv.set(null)
     super.afterEach()
   }
 
+  test("batch mode - options should be handled as case-insensitive") {
+    verifyFieldsInBatch(KafkaSourceProvider.CONSUMER_POLL_TIMEOUT, expected, 
batch => {
+      assert(expected.toLong === getField(batch, pollTimeoutMsMethod))
+    })
+  }
+
   test("micro-batch mode - options should be handled as case-insensitive") {
-    def verifyFieldsInMicroBatchStream(
-        options: CaseInsensitiveStringMap,
-        expectedPollTimeoutMs: Long,
-        expectedMaxOffsetsPerTrigger: Option[Long]): Unit = {
-      // KafkaMicroBatchStream reads Spark conf from SparkEnv for default value
-      // hence we set mock SparkEnv here before creating KafkaMicroBatchStream
-      val sparkEnv = mock(classOf[SparkEnv])
-      when(sparkEnv.conf).thenReturn(new SparkConf())
-      SparkEnv.set(sparkEnv)
+    verifyFieldsInMicroBatchStream(KafkaSourceProvider.CONSUMER_POLL_TIMEOUT, 
expected, stream => {
+      assert(expected.toLong === getField(stream, pollTimeoutMsMethod))
+    })
+    verifyFieldsInMicroBatchStream(KafkaSourceProvider.MAX_OFFSET_PER_TRIGGER, 
expected, stream => {
+      assert(Some(expected.toLong) === getField(stream, 
maxOffsetsPerTriggerMethod))
+    })
+  }
 
-      val scan = getKafkaDataSourceScan(options)
-      val stream = 
scan.toMicroBatchStream("dummy").asInstanceOf[KafkaMicroBatchStream]
+  test("SPARK-28163 - micro-batch mode - options should be handled as 
case-insensitive") {
 
 Review comment:
   Got it. In this case, +1 for that change. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to