yijiacui-db commented on a change in pull request #31944:
URL: https://github.com/apache/spark/pull/31944#discussion_r615588004



##########
File path: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
##########
@@ -1297,6 +1297,130 @@ class KafkaMicroBatchV2SourceSuite extends 
KafkaMicroBatchSourceSuiteBase {
       CheckNewAnswer(32, 33, 34, 35, 36)
     )
   }
+
+
+  test("test custom metrics - with rate limit") {
+    import testImplicits._
+
+    val topic = newTopic()
+    val data = 1 to 10
+    testUtils.createTopic(topic, partitions = 2)
+    testUtils.sendMessages(topic, (1 to 5).map(_.toString).toArray, Some(0))
+    testUtils.sendMessages(topic, (6 to 10).map(_.toString).toArray, Some(1))
+
+    val kafka = spark
+      .readStream
+      .format("kafka")
+      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+      .option("subscribe", topic)
+      .option("maxOffsetsPerTrigger", 1)
+      .option(STARTING_OFFSETS_OPTION_KEY, "earliest")
+      .load()
+      .selectExpr("CAST(value AS STRING)")
+      .as[String]
+      .map(_.toInt)
+
+    testStream(kafka)(
+      StartStream(),
+      makeSureGetOffsetCalled,
+      CheckAnswer(data: _*),
+      Execute { query => {
+        // The rate limit is 1, so there must be some delay in offsets per 
partition.
+        val progressWithDelay = 
query.recentProgress.map(_.sources.head).reverse.find {
+          progress =>
+            !progress.metrics.isEmpty && {
+              // find the metrics that has non-zero average 
offsetsBehindLatest greater than 0.
+              progress.metrics.get("avgOffsetsBehindLatest").toDouble > 0
+            }
+        }
+        assert(progressWithDelay.nonEmpty)
+        val metrics = progressWithDelay.get.metrics
+        assert(metrics.keySet() ==
+          Set("minOffsetsBehindLatest",
+            "maxOffsetsBehindLatest",
+            "avgOffsetsBehindLatest").asJava)
+        assert(metrics.get("minOffsetsBehindLatest").toLong > 0)
+        assert(metrics.get("maxOffsetsBehindLatest").toLong > 0)
+        assert(metrics.get("avgOffsetsBehindLatest").toDouble > 0)
+      }}
+    )
+  }
+
+  test("test custom metrics - no rate limit") {
+    import testImplicits._
+
+    val topic = newTopic()
+    val data = 1 to 10
+    testUtils.createTopic(topic, partitions = 2)
+    testUtils.sendMessages(topic, (1 to 5).map(_.toString).toArray, Some(0))
+    testUtils.sendMessages(topic, (6 to 10).map(_.toString).toArray, Some(1))
+
+    val kafka = spark
+      .readStream
+      .format("kafka")
+      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+      .option("subscribe", topic)
+      .option(STARTING_OFFSETS_OPTION_KEY, "earliest")
+      .load()
+      .selectExpr("CAST(value AS STRING)")
+      .as[String]
+      .map(_.toInt)
+
+    testStream(kafka)(
+      StartStream(),
+      makeSureGetOffsetCalled,
+      CheckAnswer(data: _*),
+      Execute { query => {
+        val progress = query.recentProgress.map(_.sources.head).lastOption
+        assert(progress.nonEmpty)
+        val metrics = progress.get.metrics
+        // When there is no rate limit, there shouldn't be any delay in the 
current stream.
+        assert(metrics.keySet() ==
+          Set("minOffsetsBehindLatest",
+            "maxOffsetsBehindLatest",
+            "avgOffsetsBehindLatest").asJava)
+        assert(metrics.get("minOffsetsBehindLatest").toLong == 0)
+        assert(metrics.get("maxOffsetsBehindLatest").toLong == 0)
+        assert(metrics.get("avgOffsetsBehindLatest").toDouble == 0)
+      }}
+    )
+  }
+
+  test("test custom metrics - corner cases") {
+    val topic1 = new TopicPartition(newTopic(), 0)

Review comment:
       Changed.

##########
File path: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
##########
@@ -1297,6 +1297,130 @@ class KafkaMicroBatchV2SourceSuite extends 
KafkaMicroBatchSourceSuiteBase {
       CheckNewAnswer(32, 33, 34, 35, 36)
     )
   }
+
+
+  test("test custom metrics - with rate limit") {
+    import testImplicits._
+
+    val topic = newTopic()
+    val data = 1 to 10
+    testUtils.createTopic(topic, partitions = 2)
+    testUtils.sendMessages(topic, (1 to 5).map(_.toString).toArray, Some(0))
+    testUtils.sendMessages(topic, (6 to 10).map(_.toString).toArray, Some(1))
+
+    val kafka = spark
+      .readStream
+      .format("kafka")
+      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+      .option("subscribe", topic)
+      .option("maxOffsetsPerTrigger", 1)
+      .option(STARTING_OFFSETS_OPTION_KEY, "earliest")
+      .load()
+      .selectExpr("CAST(value AS STRING)")
+      .as[String]
+      .map(_.toInt)
+
+    testStream(kafka)(
+      StartStream(),
+      makeSureGetOffsetCalled,
+      CheckAnswer(data: _*),
+      Execute { query => {
+        // The rate limit is 1, so there must be some delay in offsets per 
partition.
+        val progressWithDelay = 
query.recentProgress.map(_.sources.head).reverse.find {
+          progress =>
+            !progress.metrics.isEmpty && {
+              // find the metrics that has non-zero average 
offsetsBehindLatest greater than 0.
+              progress.metrics.get("avgOffsetsBehindLatest").toDouble > 0
+            }
+        }
+        assert(progressWithDelay.nonEmpty)
+        val metrics = progressWithDelay.get.metrics
+        assert(metrics.keySet() ==
+          Set("minOffsetsBehindLatest",
+            "maxOffsetsBehindLatest",
+            "avgOffsetsBehindLatest").asJava)
+        assert(metrics.get("minOffsetsBehindLatest").toLong > 0)
+        assert(metrics.get("maxOffsetsBehindLatest").toLong > 0)
+        assert(metrics.get("avgOffsetsBehindLatest").toDouble > 0)
+      }}
+    )
+  }
+
+  test("test custom metrics - no rate limit") {
+    import testImplicits._
+
+    val topic = newTopic()
+    val data = 1 to 10
+    testUtils.createTopic(topic, partitions = 2)
+    testUtils.sendMessages(topic, (1 to 5).map(_.toString).toArray, Some(0))
+    testUtils.sendMessages(topic, (6 to 10).map(_.toString).toArray, Some(1))
+
+    val kafka = spark
+      .readStream
+      .format("kafka")
+      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+      .option("subscribe", topic)
+      .option(STARTING_OFFSETS_OPTION_KEY, "earliest")
+      .load()
+      .selectExpr("CAST(value AS STRING)")
+      .as[String]
+      .map(_.toInt)
+
+    testStream(kafka)(
+      StartStream(),
+      makeSureGetOffsetCalled,
+      CheckAnswer(data: _*),
+      Execute { query => {
+        val progress = query.recentProgress.map(_.sources.head).lastOption
+        assert(progress.nonEmpty)
+        val metrics = progress.get.metrics
+        // When there is no rate limit, there shouldn't be any delay in the 
current stream.
+        assert(metrics.keySet() ==
+          Set("minOffsetsBehindLatest",
+            "maxOffsetsBehindLatest",
+            "avgOffsetsBehindLatest").asJava)
+        assert(metrics.get("minOffsetsBehindLatest").toLong == 0)
+        assert(metrics.get("maxOffsetsBehindLatest").toLong == 0)
+        assert(metrics.get("avgOffsetsBehindLatest").toDouble == 0)
+      }}
+    )
+  }
+
+  test("test custom metrics - corner cases") {
+    val topic1 = new TopicPartition(newTopic(), 0)
+    val topic2 = new TopicPartition(newTopic(), 0)
+    val latestOffset = Map[TopicPartition, Long]((topic1, 3L), (topic2, 6L))
+
+    // test empty offset.
+    assert(KafkaMicroBatchStream.metrics(Optional.ofNullable(null), 
latestOffset) ==

Review comment:
       Done.

##########
File path: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
##########
@@ -1297,6 +1297,130 @@ class KafkaMicroBatchV2SourceSuite extends 
KafkaMicroBatchSourceSuiteBase {
       CheckNewAnswer(32, 33, 34, 35, 36)
     )
   }
+
+
+  test("test custom metrics - with rate limit") {
+    import testImplicits._
+
+    val topic = newTopic()
+    val data = 1 to 10
+    testUtils.createTopic(topic, partitions = 2)
+    testUtils.sendMessages(topic, (1 to 5).map(_.toString).toArray, Some(0))
+    testUtils.sendMessages(topic, (6 to 10).map(_.toString).toArray, Some(1))
+
+    val kafka = spark
+      .readStream
+      .format("kafka")
+      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+      .option("subscribe", topic)
+      .option("maxOffsetsPerTrigger", 1)
+      .option(STARTING_OFFSETS_OPTION_KEY, "earliest")
+      .load()
+      .selectExpr("CAST(value AS STRING)")
+      .as[String]
+      .map(_.toInt)
+
+    testStream(kafka)(
+      StartStream(),
+      makeSureGetOffsetCalled,
+      CheckAnswer(data: _*),
+      Execute { query => {
+        // The rate limit is 1, so there must be some delay in offsets per 
partition.
+        val progressWithDelay = 
query.recentProgress.map(_.sources.head).reverse.find {
+          progress =>
+            !progress.metrics.isEmpty && {
+              // find the metrics that has non-zero average 
offsetsBehindLatest greater than 0.
+              progress.metrics.get("avgOffsetsBehindLatest").toDouble > 0
+            }
+        }
+        assert(progressWithDelay.nonEmpty)
+        val metrics = progressWithDelay.get.metrics
+        assert(metrics.keySet() ==
+          Set("minOffsetsBehindLatest",
+            "maxOffsetsBehindLatest",
+            "avgOffsetsBehindLatest").asJava)
+        assert(metrics.get("minOffsetsBehindLatest").toLong > 0)
+        assert(metrics.get("maxOffsetsBehindLatest").toLong > 0)
+        assert(metrics.get("avgOffsetsBehindLatest").toDouble > 0)
+      }}
+    )
+  }
+
+  test("test custom metrics - no rate limit") {
+    import testImplicits._
+
+    val topic = newTopic()
+    val data = 1 to 10
+    testUtils.createTopic(topic, partitions = 2)
+    testUtils.sendMessages(topic, (1 to 5).map(_.toString).toArray, Some(0))
+    testUtils.sendMessages(topic, (6 to 10).map(_.toString).toArray, Some(1))
+
+    val kafka = spark
+      .readStream
+      .format("kafka")
+      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+      .option("subscribe", topic)
+      .option(STARTING_OFFSETS_OPTION_KEY, "earliest")
+      .load()
+      .selectExpr("CAST(value AS STRING)")
+      .as[String]
+      .map(_.toInt)
+
+    testStream(kafka)(
+      StartStream(),
+      makeSureGetOffsetCalled,
+      CheckAnswer(data: _*),
+      Execute { query => {
+        val progress = query.recentProgress.map(_.sources.head).lastOption
+        assert(progress.nonEmpty)
+        val metrics = progress.get.metrics
+        // When there is no rate limit, there shouldn't be any delay in the 
current stream.
+        assert(metrics.keySet() ==
+          Set("minOffsetsBehindLatest",
+            "maxOffsetsBehindLatest",
+            "avgOffsetsBehindLatest").asJava)
+        assert(metrics.get("minOffsetsBehindLatest").toLong == 0)
+        assert(metrics.get("maxOffsetsBehindLatest").toLong == 0)
+        assert(metrics.get("avgOffsetsBehindLatest").toDouble == 0)
+      }}
+    )
+  }
+
+  test("test custom metrics - corner cases") {
+    val topic1 = new TopicPartition(newTopic(), 0)
+    val topic2 = new TopicPartition(newTopic(), 0)
+    val latestOffset = Map[TopicPartition, Long]((topic1, 3L), (topic2, 6L))
+
+    // test empty offset.
+    assert(KafkaMicroBatchStream.metrics(Optional.ofNullable(null), 
latestOffset) ==
+      Map[String, String]().asJava)
+
+    // test empty offsetsBehindLatest && topics are missing in the 
latestConsumedOffset.
+    val emptyOffset = KafkaSourceOffset(Map[TopicPartition, Long]())
+    assert(KafkaMicroBatchStream.metrics(Optional.ofNullable(emptyOffset), 
latestOffset) ==

Review comment:
       Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to