HeartSaVioR commented on a change in pull request #29729:
URL: https://github.com/apache/spark/pull/29729#discussion_r531554092



##########
File path: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderAdmin.scala
##########
@@ -0,0 +1,572 @@
+/*

Review comment:
       NOTE to reviewers:
   
   This file is identical with the commit which is just before the huge change, 
except the diff which is due to the new introduction of `KafkaOffsetReaderBase`.
   
   
https://github.com/gaborgsomogyi/spark/blob/d0e4c63018beb4e0d36bfea1753c3e5d8f13659e/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala

##########
File path: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderConsumer.scala
##########
@@ -0,0 +1,613 @@
+/*

Review comment:
       NOTE to reviewers:
   
   This file is identical with the latest master branch of `KafkaOffsetReader`, 
except the diff which is due to the new introduction of KafkaOffsetReaderBase.
   
   
https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala

##########
File path: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderSuite.scala
##########
@@ -66,74 +70,124 @@ class KafkaOffsetReaderSuite extends QueryTest with 
SharedSparkSession with Kafk
     )
   }
 
+  test("isolationLevel must give back default isolation level when not set") {
+    testIsolationLevel(None,
+      
IsolationLevel.valueOf(ConsumerConfig.DEFAULT_ISOLATION_LEVEL.toUpperCase(Locale.ROOT)))
+  }
+
+  test("isolationLevel must give back READ_UNCOMMITTED when set") {
+    testIsolationLevel(Some("read_uncommitted"), 
IsolationLevel.READ_UNCOMMITTED)
+  }
+
+  test("isolationLevel must give back READ_COMMITTED when set") {
+    testIsolationLevel(Some("read_committed"), IsolationLevel.READ_COMMITTED)
+  }
+
+  test("isolationLevel must throw exception when invalid isolation level set") 
{
+    intercept[IllegalArgumentException] {
+      testIsolationLevel(Some("intentionally_invalid"), 
IsolationLevel.READ_COMMITTED)
+    }
+  }
+
+  private def testIsolationLevel(kafkaParam: Option[String], isolationLevel: 
IsolationLevel) = {
+    var kafkaParams = Map(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG -> 
testUtils.brokerAddress)
+    kafkaParam.foreach(p => kafkaParams ++= 
Map(ConsumerConfig.ISOLATION_LEVEL_CONFIG -> p))
+    val reader = new KafkaOffsetReaderAdmin(
+      SubscribeStrategy(Seq()),
+      KafkaSourceProvider.kafkaParamsForDriver(kafkaParams),
+      CaseInsensitiveMap(Map.empty),
+      ""
+    )
+    assert(reader.isolationLevel === isolationLevel)
+  }
+
   test("SPARK-30656: getOffsetRangesFromUnresolvedOffsets - using specific 
offsets") {

Review comment:
       I'd rather follow the way to create tests dynamically.
   
   
https://github.com/apache/spark/blob/e43255051c0a82713d653fe590fe7728e43556ce/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala#L53-L103
   
   This has a downside to not able to run test individually in IDE, but at 
least this will ensure each test runs individually. Adding `Seq` in test you 
don't know test fails with which option, "true" or "false".




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to