gaborgsomogyi commented on a change in pull request #31944:
URL: https://github.com/apache/spark/pull/31944#discussion_r613997289
##########
File path:
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
##########
@@ -1297,6 +1297,130 @@ class KafkaMicroBatchV2SourceSuite extends
KafkaMicroBatchSourceSuiteBase {
CheckNewAnswer(32, 33, 34, 35, 36)
)
}
+
+
+ test("test custom metrics - with rate limit") {
+ import testImplicits._
+
+ val topic = newTopic()
+ val data = 1 to 10
+ testUtils.createTopic(topic, partitions = 2)
+ testUtils.sendMessages(topic, (1 to 5).map(_.toString).toArray, Some(0))
+ testUtils.sendMessages(topic, (6 to 10).map(_.toString).toArray, Some(1))
+
+ val kafka = spark
+ .readStream
+ .format("kafka")
+ .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+ .option("subscribe", topic)
+ .option("maxOffsetsPerTrigger", 1)
+ .option(STARTING_OFFSETS_OPTION_KEY, "earliest")
+ .load()
+ .selectExpr("CAST(value AS STRING)")
+ .as[String]
+ .map(_.toInt)
+
+ testStream(kafka)(
+ StartStream(),
+ makeSureGetOffsetCalled,
+ CheckAnswer(data: _*),
+ Execute { query => {
Review comment:
Not sure why curly brace at the end needed?
##########
File path:
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
##########
@@ -1297,6 +1297,130 @@ class KafkaMicroBatchV2SourceSuite extends
KafkaMicroBatchSourceSuiteBase {
CheckNewAnswer(32, 33, 34, 35, 36)
)
}
+
Review comment:
Nit: -1 newline needed.
##########
File path:
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
##########
@@ -1297,6 +1297,130 @@ class KafkaMicroBatchV2SourceSuite extends
KafkaMicroBatchSourceSuiteBase {
CheckNewAnswer(32, 33, 34, 35, 36)
)
}
+
+
+ test("test custom metrics - with rate limit") {
+ import testImplicits._
+
+ val topic = newTopic()
+ val data = 1 to 10
+ testUtils.createTopic(topic, partitions = 2)
+ testUtils.sendMessages(topic, (1 to 5).map(_.toString).toArray, Some(0))
+ testUtils.sendMessages(topic, (6 to 10).map(_.toString).toArray, Some(1))
+
+ val kafka = spark
+ .readStream
+ .format("kafka")
+ .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+ .option("subscribe", topic)
+ .option("maxOffsetsPerTrigger", 1)
+ .option(STARTING_OFFSETS_OPTION_KEY, "earliest")
+ .load()
+ .selectExpr("CAST(value AS STRING)")
+ .as[String]
+ .map(_.toInt)
+
+ testStream(kafka)(
+ StartStream(),
+ makeSureGetOffsetCalled,
+ CheckAnswer(data: _*),
+ Execute { query => {
+ // The rate limit is 1, so there must be some delay in offsets per
partition.
+ val progressWithDelay =
query.recentProgress.map(_.sources.head).reverse.find {
+ progress =>
+ !progress.metrics.isEmpty && {
+ // find the metrics that has non-zero average
offsetsBehindLatest greater than 0.
+ progress.metrics.get("avgOffsetsBehindLatest").toDouble > 0
+ }
+ }
+ assert(progressWithDelay.nonEmpty)
+ val metrics = progressWithDelay.get.metrics
+ assert(metrics.keySet() ==
+ Set("minOffsetsBehindLatest",
+ "maxOffsetsBehindLatest",
+ "avgOffsetsBehindLatest").asJava)
+ assert(metrics.get("minOffsetsBehindLatest").toLong > 0)
+ assert(metrics.get("maxOffsetsBehindLatest").toLong > 0)
+ assert(metrics.get("avgOffsetsBehindLatest").toDouble > 0)
+ }}
+ )
+ }
+
+ test("test custom metrics - no rate limit") {
+ import testImplicits._
+
+ val topic = newTopic()
+ val data = 1 to 10
+ testUtils.createTopic(topic, partitions = 2)
+ testUtils.sendMessages(topic, (1 to 5).map(_.toString).toArray, Some(0))
+ testUtils.sendMessages(topic, (6 to 10).map(_.toString).toArray, Some(1))
+
+ val kafka = spark
+ .readStream
+ .format("kafka")
+ .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+ .option("subscribe", topic)
+ .option(STARTING_OFFSETS_OPTION_KEY, "earliest")
+ .load()
+ .selectExpr("CAST(value AS STRING)")
+ .as[String]
+ .map(_.toInt)
+
+ testStream(kafka)(
+ StartStream(),
+ makeSureGetOffsetCalled,
+ CheckAnswer(data: _*),
+ Execute { query => {
+ val progress = query.recentProgress.map(_.sources.head).lastOption
+ assert(progress.nonEmpty)
+ val metrics = progress.get.metrics
+ // When there is no rate limit, there shouldn't be any delay in the
current stream.
+ assert(metrics.keySet() ==
+ Set("minOffsetsBehindLatest",
+ "maxOffsetsBehindLatest",
+ "avgOffsetsBehindLatest").asJava)
+ assert(metrics.get("minOffsetsBehindLatest").toLong == 0)
+ assert(metrics.get("maxOffsetsBehindLatest").toLong == 0)
+ assert(metrics.get("avgOffsetsBehindLatest").toDouble == 0)
+ }}
+ )
+ }
+
+ test("test custom metrics - corner cases") {
+ val topic1 = new TopicPartition(newTopic(), 0)
+ val topic2 = new TopicPartition(newTopic(), 0)
+ val latestOffset = Map[TopicPartition, Long]((topic1, 3L), (topic2, 6L))
+
+ // test empty offset.
+ assert(KafkaMicroBatchStream.metrics(Optional.ofNullable(null),
latestOffset) ==
+ Map[String, String]().asJava)
+
+ // test empty offsetsBehindLatest && topics are missing in the
latestConsumedOffset.
+ val emptyOffset = KafkaSourceOffset(Map[TopicPartition, Long]())
+ assert(KafkaMicroBatchStream.metrics(Optional.ofNullable(emptyOffset),
latestOffset) ==
+ Map[String, String]().asJava)
+
+ // test valid offsetsBehindLatest
+ val offset = KafkaSourceOffset(Map[TopicPartition, Long]((topic1, 1L),
(topic2, 2L)))
+ assert(
+ KafkaMicroBatchStream.metrics(Optional.ofNullable(offset), latestOffset)
==
+ Map[String, String](
+ "minOffsetsBehindLatest" -> "2",
+ "maxOffsetsBehindLatest" -> "4",
+ "avgOffsetsBehindLatest" -> "3.0").asJava)
+
+ // test a topic is missing in both the latestConsumedOffset and
latestAvailableOffset.
+ val topic3 = new TopicPartition(newTopic(), 0)
+ val offset2 =
+ KafkaSourceOffset(Map[TopicPartition, Long]((topic1, 1L), (topic3, 2L)))
+ val latestAvailableOffsets = Map[TopicPartition, Long]((topic2, 3L),
(topic3, 6L))
+ assert(
+ KafkaMicroBatchStream.metrics(Optional.ofNullable(offset2),
latestAvailableOffsets) ==
Review comment:
Triple equals.
##########
File path:
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
##########
@@ -1297,6 +1297,130 @@ class KafkaMicroBatchV2SourceSuite extends
KafkaMicroBatchSourceSuiteBase {
CheckNewAnswer(32, 33, 34, 35, 36)
)
}
+
+
+ test("test custom metrics - with rate limit") {
+ import testImplicits._
+
+ val topic = newTopic()
+ val data = 1 to 10
+ testUtils.createTopic(topic, partitions = 2)
+ testUtils.sendMessages(topic, (1 to 5).map(_.toString).toArray, Some(0))
+ testUtils.sendMessages(topic, (6 to 10).map(_.toString).toArray, Some(1))
+
+ val kafka = spark
+ .readStream
+ .format("kafka")
+ .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+ .option("subscribe", topic)
+ .option("maxOffsetsPerTrigger", 1)
+ .option(STARTING_OFFSETS_OPTION_KEY, "earliest")
+ .load()
+ .selectExpr("CAST(value AS STRING)")
+ .as[String]
+ .map(_.toInt)
+
+ testStream(kafka)(
+ StartStream(),
+ makeSureGetOffsetCalled,
+ CheckAnswer(data: _*),
+ Execute { query => {
+ // The rate limit is 1, so there must be some delay in offsets per
partition.
+ val progressWithDelay =
query.recentProgress.map(_.sources.head).reverse.find {
+ progress =>
+ !progress.metrics.isEmpty && {
+ // find the metrics that has non-zero average
offsetsBehindLatest greater than 0.
+ progress.metrics.get("avgOffsetsBehindLatest").toDouble > 0
+ }
+ }
+ assert(progressWithDelay.nonEmpty)
+ val metrics = progressWithDelay.get.metrics
+ assert(metrics.keySet() ==
+ Set("minOffsetsBehindLatest",
+ "maxOffsetsBehindLatest",
+ "avgOffsetsBehindLatest").asJava)
+ assert(metrics.get("minOffsetsBehindLatest").toLong > 0)
+ assert(metrics.get("maxOffsetsBehindLatest").toLong > 0)
+ assert(metrics.get("avgOffsetsBehindLatest").toDouble > 0)
+ }}
+ )
+ }
+
+ test("test custom metrics - no rate limit") {
+ import testImplicits._
+
+ val topic = newTopic()
+ val data = 1 to 10
+ testUtils.createTopic(topic, partitions = 2)
+ testUtils.sendMessages(topic, (1 to 5).map(_.toString).toArray, Some(0))
+ testUtils.sendMessages(topic, (6 to 10).map(_.toString).toArray, Some(1))
+
+ val kafka = spark
+ .readStream
+ .format("kafka")
+ .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+ .option("subscribe", topic)
+ .option(STARTING_OFFSETS_OPTION_KEY, "earliest")
+ .load()
+ .selectExpr("CAST(value AS STRING)")
+ .as[String]
+ .map(_.toInt)
+
+ testStream(kafka)(
+ StartStream(),
+ makeSureGetOffsetCalled,
+ CheckAnswer(data: _*),
+ Execute { query => {
+ val progress = query.recentProgress.map(_.sources.head).lastOption
+ assert(progress.nonEmpty)
+ val metrics = progress.get.metrics
+ // When there is no rate limit, there shouldn't be any delay in the
current stream.
+ assert(metrics.keySet() ==
+ Set("minOffsetsBehindLatest",
+ "maxOffsetsBehindLatest",
+ "avgOffsetsBehindLatest").asJava)
+ assert(metrics.get("minOffsetsBehindLatest").toLong == 0)
+ assert(metrics.get("maxOffsetsBehindLatest").toLong == 0)
+ assert(metrics.get("avgOffsetsBehindLatest").toDouble == 0)
+ }}
+ )
+ }
+
+ test("test custom metrics - corner cases") {
+ val topic1 = new TopicPartition(newTopic(), 0)
+ val topic2 = new TopicPartition(newTopic(), 0)
+ val latestOffset = Map[TopicPartition, Long]((topic1, 3L), (topic2, 6L))
+
+ // test empty offset.
+ assert(KafkaMicroBatchStream.metrics(Optional.ofNullable(null),
latestOffset) ==
+ Map[String, String]().asJava)
+
+ // test empty offsetsBehindLatest && topics are missing in the
latestConsumedOffset.
+ val emptyOffset = KafkaSourceOffset(Map[TopicPartition, Long]())
+ assert(KafkaMicroBatchStream.metrics(Optional.ofNullable(emptyOffset),
latestOffset) ==
Review comment:
`isEmpty` would be more simple, right?
##########
File path:
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
##########
@@ -1297,6 +1297,130 @@ class KafkaMicroBatchV2SourceSuite extends
KafkaMicroBatchSourceSuiteBase {
CheckNewAnswer(32, 33, 34, 35, 36)
)
}
+
+
+ test("test custom metrics - with rate limit") {
+ import testImplicits._
+
+ val topic = newTopic()
+ val data = 1 to 10
+ testUtils.createTopic(topic, partitions = 2)
+ testUtils.sendMessages(topic, (1 to 5).map(_.toString).toArray, Some(0))
+ testUtils.sendMessages(topic, (6 to 10).map(_.toString).toArray, Some(1))
+
+ val kafka = spark
+ .readStream
+ .format("kafka")
+ .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+ .option("subscribe", topic)
+ .option("maxOffsetsPerTrigger", 1)
+ .option(STARTING_OFFSETS_OPTION_KEY, "earliest")
+ .load()
+ .selectExpr("CAST(value AS STRING)")
+ .as[String]
+ .map(_.toInt)
+
+ testStream(kafka)(
+ StartStream(),
+ makeSureGetOffsetCalled,
+ CheckAnswer(data: _*),
+ Execute { query => {
+ // The rate limit is 1, so there must be some delay in offsets per
partition.
+ val progressWithDelay =
query.recentProgress.map(_.sources.head).reverse.find {
+ progress =>
+ !progress.metrics.isEmpty && {
+ // find the metrics that has non-zero average
offsetsBehindLatest greater than 0.
+ progress.metrics.get("avgOffsetsBehindLatest").toDouble > 0
+ }
+ }
+ assert(progressWithDelay.nonEmpty)
+ val metrics = progressWithDelay.get.metrics
+ assert(metrics.keySet() ==
+ Set("minOffsetsBehindLatest",
+ "maxOffsetsBehindLatest",
+ "avgOffsetsBehindLatest").asJava)
+ assert(metrics.get("minOffsetsBehindLatest").toLong > 0)
+ assert(metrics.get("maxOffsetsBehindLatest").toLong > 0)
+ assert(metrics.get("avgOffsetsBehindLatest").toDouble > 0)
+ }}
+ )
+ }
+
+ test("test custom metrics - no rate limit") {
+ import testImplicits._
+
+ val topic = newTopic()
+ val data = 1 to 10
+ testUtils.createTopic(topic, partitions = 2)
+ testUtils.sendMessages(topic, (1 to 5).map(_.toString).toArray, Some(0))
+ testUtils.sendMessages(topic, (6 to 10).map(_.toString).toArray, Some(1))
+
+ val kafka = spark
+ .readStream
+ .format("kafka")
+ .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+ .option("subscribe", topic)
+ .option(STARTING_OFFSETS_OPTION_KEY, "earliest")
+ .load()
+ .selectExpr("CAST(value AS STRING)")
+ .as[String]
+ .map(_.toInt)
+
+ testStream(kafka)(
+ StartStream(),
+ makeSureGetOffsetCalled,
+ CheckAnswer(data: _*),
+ Execute { query => {
Review comment:
Not sure why curly brace at the end needed?
##########
File path:
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
##########
@@ -1297,6 +1297,130 @@ class KafkaMicroBatchV2SourceSuite extends
KafkaMicroBatchSourceSuiteBase {
CheckNewAnswer(32, 33, 34, 35, 36)
)
}
+
+
+ test("test custom metrics - with rate limit") {
+ import testImplicits._
+
+ val topic = newTopic()
+ val data = 1 to 10
+ testUtils.createTopic(topic, partitions = 2)
+ testUtils.sendMessages(topic, (1 to 5).map(_.toString).toArray, Some(0))
+ testUtils.sendMessages(topic, (6 to 10).map(_.toString).toArray, Some(1))
+
+ val kafka = spark
+ .readStream
+ .format("kafka")
+ .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+ .option("subscribe", topic)
+ .option("maxOffsetsPerTrigger", 1)
+ .option(STARTING_OFFSETS_OPTION_KEY, "earliest")
+ .load()
+ .selectExpr("CAST(value AS STRING)")
+ .as[String]
+ .map(_.toInt)
+
+ testStream(kafka)(
+ StartStream(),
+ makeSureGetOffsetCalled,
+ CheckAnswer(data: _*),
+ Execute { query => {
+ // The rate limit is 1, so there must be some delay in offsets per
partition.
+ val progressWithDelay =
query.recentProgress.map(_.sources.head).reverse.find {
+ progress =>
+ !progress.metrics.isEmpty && {
+ // find the metrics that has non-zero average
offsetsBehindLatest greater than 0.
+ progress.metrics.get("avgOffsetsBehindLatest").toDouble > 0
+ }
+ }
+ assert(progressWithDelay.nonEmpty)
+ val metrics = progressWithDelay.get.metrics
+ assert(metrics.keySet() ==
+ Set("minOffsetsBehindLatest",
+ "maxOffsetsBehindLatest",
+ "avgOffsetsBehindLatest").asJava)
+ assert(metrics.get("minOffsetsBehindLatest").toLong > 0)
+ assert(metrics.get("maxOffsetsBehindLatest").toLong > 0)
+ assert(metrics.get("avgOffsetsBehindLatest").toDouble > 0)
+ }}
+ )
+ }
+
+ test("test custom metrics - no rate limit") {
+ import testImplicits._
+
+ val topic = newTopic()
+ val data = 1 to 10
+ testUtils.createTopic(topic, partitions = 2)
+ testUtils.sendMessages(topic, (1 to 5).map(_.toString).toArray, Some(0))
+ testUtils.sendMessages(topic, (6 to 10).map(_.toString).toArray, Some(1))
+
+ val kafka = spark
+ .readStream
+ .format("kafka")
+ .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+ .option("subscribe", topic)
+ .option(STARTING_OFFSETS_OPTION_KEY, "earliest")
+ .load()
+ .selectExpr("CAST(value AS STRING)")
+ .as[String]
+ .map(_.toInt)
+
+ testStream(kafka)(
+ StartStream(),
+ makeSureGetOffsetCalled,
+ CheckAnswer(data: _*),
+ Execute { query => {
+ val progress = query.recentProgress.map(_.sources.head).lastOption
+ assert(progress.nonEmpty)
+ val metrics = progress.get.metrics
+ // When there is no rate limit, there shouldn't be any delay in the
current stream.
+ assert(metrics.keySet() ==
+ Set("minOffsetsBehindLatest",
+ "maxOffsetsBehindLatest",
+ "avgOffsetsBehindLatest").asJava)
+ assert(metrics.get("minOffsetsBehindLatest").toLong == 0)
+ assert(metrics.get("maxOffsetsBehindLatest").toLong == 0)
+ assert(metrics.get("avgOffsetsBehindLatest").toDouble == 0)
+ }}
+ )
+ }
+
+ test("test custom metrics - corner cases") {
+ val topic1 = new TopicPartition(newTopic(), 0)
+ val topic2 = new TopicPartition(newTopic(), 0)
+ val latestOffset = Map[TopicPartition, Long]((topic1, 3L), (topic2, 6L))
+
+ // test empty offset.
+ assert(KafkaMicroBatchStream.metrics(Optional.ofNullable(null),
latestOffset) ==
Review comment:
`isEmpty` would be more simple, right?
##########
File path:
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
##########
@@ -1297,6 +1297,130 @@ class KafkaMicroBatchV2SourceSuite extends
KafkaMicroBatchSourceSuiteBase {
CheckNewAnswer(32, 33, 34, 35, 36)
)
}
+
+
+ test("test custom metrics - with rate limit") {
+ import testImplicits._
+
+ val topic = newTopic()
+ val data = 1 to 10
+ testUtils.createTopic(topic, partitions = 2)
+ testUtils.sendMessages(topic, (1 to 5).map(_.toString).toArray, Some(0))
+ testUtils.sendMessages(topic, (6 to 10).map(_.toString).toArray, Some(1))
+
+ val kafka = spark
+ .readStream
+ .format("kafka")
+ .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+ .option("subscribe", topic)
+ .option("maxOffsetsPerTrigger", 1)
+ .option(STARTING_OFFSETS_OPTION_KEY, "earliest")
+ .load()
+ .selectExpr("CAST(value AS STRING)")
+ .as[String]
+ .map(_.toInt)
+
+ testStream(kafka)(
+ StartStream(),
+ makeSureGetOffsetCalled,
+ CheckAnswer(data: _*),
+ Execute { query => {
+ // The rate limit is 1, so there must be some delay in offsets per
partition.
+ val progressWithDelay =
query.recentProgress.map(_.sources.head).reverse.find {
+ progress =>
+ !progress.metrics.isEmpty && {
+ // find the metrics that has non-zero average
offsetsBehindLatest greater than 0.
+ progress.metrics.get("avgOffsetsBehindLatest").toDouble > 0
+ }
+ }
+ assert(progressWithDelay.nonEmpty)
+ val metrics = progressWithDelay.get.metrics
+ assert(metrics.keySet() ==
+ Set("minOffsetsBehindLatest",
+ "maxOffsetsBehindLatest",
+ "avgOffsetsBehindLatest").asJava)
+ assert(metrics.get("minOffsetsBehindLatest").toLong > 0)
+ assert(metrics.get("maxOffsetsBehindLatest").toLong > 0)
+ assert(metrics.get("avgOffsetsBehindLatest").toDouble > 0)
+ }}
+ )
+ }
+
+ test("test custom metrics - no rate limit") {
+ import testImplicits._
+
+ val topic = newTopic()
+ val data = 1 to 10
+ testUtils.createTopic(topic, partitions = 2)
+ testUtils.sendMessages(topic, (1 to 5).map(_.toString).toArray, Some(0))
+ testUtils.sendMessages(topic, (6 to 10).map(_.toString).toArray, Some(1))
+
+ val kafka = spark
+ .readStream
+ .format("kafka")
+ .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+ .option("subscribe", topic)
+ .option(STARTING_OFFSETS_OPTION_KEY, "earliest")
+ .load()
+ .selectExpr("CAST(value AS STRING)")
+ .as[String]
+ .map(_.toInt)
+
+ testStream(kafka)(
+ StartStream(),
+ makeSureGetOffsetCalled,
+ CheckAnswer(data: _*),
+ Execute { query => {
+ val progress = query.recentProgress.map(_.sources.head).lastOption
+ assert(progress.nonEmpty)
+ val metrics = progress.get.metrics
+ // When there is no rate limit, there shouldn't be any delay in the
current stream.
+ assert(metrics.keySet() ==
+ Set("minOffsetsBehindLatest",
+ "maxOffsetsBehindLatest",
+ "avgOffsetsBehindLatest").asJava)
+ assert(metrics.get("minOffsetsBehindLatest").toLong == 0)
+ assert(metrics.get("maxOffsetsBehindLatest").toLong == 0)
+ assert(metrics.get("avgOffsetsBehindLatest").toDouble == 0)
+ }}
+ )
+ }
+
+ test("test custom metrics - corner cases") {
+ val topic1 = new TopicPartition(newTopic(), 0)
Review comment:
These are not topics but topicPartitions, right?
##########
File path:
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
##########
@@ -218,3 +226,37 @@ private[kafka010] class KafkaMicroBatchStream(
}
}
}
+
+object KafkaMicroBatchStream extends Logging {
+
+ /**
+ * Compute the difference of offset per partition between
latestAvailablePartitionOffsets
+ * and latestConsumedPartitionOffsets.
+ * Because of rate limit, latest consumed offset per partition can be
smaller than
+ * the latest available offset per partition.
+ * @param latestConsumedOffset latest consumed offset
+ * @param latestAvailablePartitionOffsets latest available offset per
partition
+ * @return the generated metrics map
+ */
+ def metrics(
+ latestConsumedOffset: Optional[Offset],
+ latestAvailablePartitionOffsets: PartitionOffsetMap): ju.Map[String,
String] = {
+ val offset = Option(latestConsumedOffset.orElse(null))
+
+ if (offset.nonEmpty) {
+ val consumedPartitionOffsets =
offset.map(KafkaSourceOffset(_)).get.partitionToOffsets
+ val offsetsBehindLatest = latestAvailablePartitionOffsets
+ .filter(partitionOffset =>
consumedPartitionOffsets.contains(partitionOffset._1))
+ .map(partitionOffset =>
Review comment:
This can be simplified and no need to break line:
```
.map(partitionOffset => partitionOffset._2 -
consumedPartitionOffsets(partitionOffset._1))
```
##########
File path:
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
##########
@@ -1297,6 +1297,130 @@ class KafkaMicroBatchV2SourceSuite extends
KafkaMicroBatchSourceSuiteBase {
CheckNewAnswer(32, 33, 34, 35, 36)
)
}
+
+
+ test("test custom metrics - with rate limit") {
+ import testImplicits._
+
+ val topic = newTopic()
+ val data = 1 to 10
+ testUtils.createTopic(topic, partitions = 2)
+ testUtils.sendMessages(topic, (1 to 5).map(_.toString).toArray, Some(0))
+ testUtils.sendMessages(topic, (6 to 10).map(_.toString).toArray, Some(1))
+
+ val kafka = spark
+ .readStream
+ .format("kafka")
+ .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+ .option("subscribe", topic)
+ .option("maxOffsetsPerTrigger", 1)
+ .option(STARTING_OFFSETS_OPTION_KEY, "earliest")
+ .load()
+ .selectExpr("CAST(value AS STRING)")
+ .as[String]
+ .map(_.toInt)
+
+ testStream(kafka)(
+ StartStream(),
+ makeSureGetOffsetCalled,
+ CheckAnswer(data: _*),
+ Execute { query => {
+ // The rate limit is 1, so there must be some delay in offsets per
partition.
+ val progressWithDelay =
query.recentProgress.map(_.sources.head).reverse.find {
+ progress =>
+ !progress.metrics.isEmpty && {
+ // find the metrics that has non-zero average
offsetsBehindLatest greater than 0.
+ progress.metrics.get("avgOffsetsBehindLatest").toDouble > 0
+ }
+ }
+ assert(progressWithDelay.nonEmpty)
+ val metrics = progressWithDelay.get.metrics
+ assert(metrics.keySet() ==
+ Set("minOffsetsBehindLatest",
+ "maxOffsetsBehindLatest",
+ "avgOffsetsBehindLatest").asJava)
+ assert(metrics.get("minOffsetsBehindLatest").toLong > 0)
+ assert(metrics.get("maxOffsetsBehindLatest").toLong > 0)
+ assert(metrics.get("avgOffsetsBehindLatest").toDouble > 0)
+ }}
+ )
+ }
+
+ test("test custom metrics - no rate limit") {
+ import testImplicits._
+
+ val topic = newTopic()
+ val data = 1 to 10
+ testUtils.createTopic(topic, partitions = 2)
+ testUtils.sendMessages(topic, (1 to 5).map(_.toString).toArray, Some(0))
+ testUtils.sendMessages(topic, (6 to 10).map(_.toString).toArray, Some(1))
+
+ val kafka = spark
+ .readStream
+ .format("kafka")
+ .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+ .option("subscribe", topic)
+ .option(STARTING_OFFSETS_OPTION_KEY, "earliest")
+ .load()
+ .selectExpr("CAST(value AS STRING)")
+ .as[String]
+ .map(_.toInt)
+
+ testStream(kafka)(
+ StartStream(),
+ makeSureGetOffsetCalled,
+ CheckAnswer(data: _*),
+ Execute { query => {
+ val progress = query.recentProgress.map(_.sources.head).lastOption
+ assert(progress.nonEmpty)
+ val metrics = progress.get.metrics
+ // When there is no rate limit, there shouldn't be any delay in the
current stream.
+ assert(metrics.keySet() ==
+ Set("minOffsetsBehindLatest",
+ "maxOffsetsBehindLatest",
+ "avgOffsetsBehindLatest").asJava)
+ assert(metrics.get("minOffsetsBehindLatest").toLong == 0)
Review comment:
Is there a specific reason why not use triple equals? Normally we prefer
that in Scala.
##########
File path: sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
##########
@@ -138,31 +138,21 @@ class StreamingQueryProgress private[sql](
override def toString: String = prettyJson
private[sql] def jsonValue: JValue = {
- def safeDoubleToJValue(value: Double): JValue = {
- if (value.isNaN || value.isInfinity) JNothing else JDouble(value)
- }
-
- /** Convert map to JValue while handling empty maps. Also, this sorts the
keys. */
- def safeMapToJValue[T](map: ju.Map[String, T], valueToJValue: T =>
JValue): JValue = {
- if (map.isEmpty) return JNothing
- val keys = map.asScala.keySet.toSeq.sorted
- keys.map { k => k -> valueToJValue(map.get(k)) : JObject }.reduce(_ ~ _)
- }
-
("id" -> JString(id.toString)) ~
("runId" -> JString(runId.toString)) ~
("name" -> JString(name)) ~
("timestamp" -> JString(timestamp)) ~
("batchId" -> JInt(batchId)) ~
("numInputRows" -> JInt(numInputRows)) ~
- ("inputRowsPerSecond" -> safeDoubleToJValue(inputRowsPerSecond)) ~
- ("processedRowsPerSecond" -> safeDoubleToJValue(processedRowsPerSecond)) ~
- ("durationMs" -> safeMapToJValue[JLong](durationMs, v => JInt(v.toLong))) ~
- ("eventTime" -> safeMapToJValue[String](eventTime, s => JString(s))) ~
+ ("inputRowsPerSecond" ->
SafeJsonSerializer.safeDoubleToJValue(inputRowsPerSecond)) ~
Review comment:
This can be added to reduce the boilerplate code, right?
```
import org.apache.spark.sql.streaming.SafeJsonSerializer.safeDoubleToJValue
```
##########
File path:
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
##########
@@ -1297,6 +1297,130 @@ class KafkaMicroBatchV2SourceSuite extends
KafkaMicroBatchSourceSuiteBase {
CheckNewAnswer(32, 33, 34, 35, 36)
)
}
+
+
+ test("test custom metrics - with rate limit") {
+ import testImplicits._
+
+ val topic = newTopic()
+ val data = 1 to 10
+ testUtils.createTopic(topic, partitions = 2)
+ testUtils.sendMessages(topic, (1 to 5).map(_.toString).toArray, Some(0))
+ testUtils.sendMessages(topic, (6 to 10).map(_.toString).toArray, Some(1))
+
+ val kafka = spark
+ .readStream
+ .format("kafka")
+ .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+ .option("subscribe", topic)
+ .option("maxOffsetsPerTrigger", 1)
+ .option(STARTING_OFFSETS_OPTION_KEY, "earliest")
+ .load()
+ .selectExpr("CAST(value AS STRING)")
+ .as[String]
+ .map(_.toInt)
+
+ testStream(kafka)(
+ StartStream(),
+ makeSureGetOffsetCalled,
+ CheckAnswer(data: _*),
+ Execute { query => {
+ // The rate limit is 1, so there must be some delay in offsets per
partition.
+ val progressWithDelay =
query.recentProgress.map(_.sources.head).reverse.find {
+ progress =>
+ !progress.metrics.isEmpty && {
+ // find the metrics that has non-zero average
offsetsBehindLatest greater than 0.
+ progress.metrics.get("avgOffsetsBehindLatest").toDouble > 0
+ }
+ }
+ assert(progressWithDelay.nonEmpty)
+ val metrics = progressWithDelay.get.metrics
+ assert(metrics.keySet() ==
+ Set("minOffsetsBehindLatest",
+ "maxOffsetsBehindLatest",
+ "avgOffsetsBehindLatest").asJava)
+ assert(metrics.get("minOffsetsBehindLatest").toLong > 0)
+ assert(metrics.get("maxOffsetsBehindLatest").toLong > 0)
+ assert(metrics.get("avgOffsetsBehindLatest").toDouble > 0)
+ }}
+ )
+ }
+
+ test("test custom metrics - no rate limit") {
+ import testImplicits._
+
+ val topic = newTopic()
+ val data = 1 to 10
+ testUtils.createTopic(topic, partitions = 2)
+ testUtils.sendMessages(topic, (1 to 5).map(_.toString).toArray, Some(0))
+ testUtils.sendMessages(topic, (6 to 10).map(_.toString).toArray, Some(1))
+
+ val kafka = spark
+ .readStream
+ .format("kafka")
+ .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+ .option("subscribe", topic)
+ .option(STARTING_OFFSETS_OPTION_KEY, "earliest")
+ .load()
+ .selectExpr("CAST(value AS STRING)")
+ .as[String]
+ .map(_.toInt)
+
+ testStream(kafka)(
+ StartStream(),
+ makeSureGetOffsetCalled,
+ CheckAnswer(data: _*),
+ Execute { query => {
+ val progress = query.recentProgress.map(_.sources.head).lastOption
+ assert(progress.nonEmpty)
+ val metrics = progress.get.metrics
+ // When there is no rate limit, there shouldn't be any delay in the
current stream.
+ assert(metrics.keySet() ==
+ Set("minOffsetsBehindLatest",
+ "maxOffsetsBehindLatest",
+ "avgOffsetsBehindLatest").asJava)
+ assert(metrics.get("minOffsetsBehindLatest").toLong == 0)
+ assert(metrics.get("maxOffsetsBehindLatest").toLong == 0)
+ assert(metrics.get("avgOffsetsBehindLatest").toDouble == 0)
+ }}
+ )
+ }
+
+ test("test custom metrics - corner cases") {
+ val topic1 = new TopicPartition(newTopic(), 0)
+ val topic2 = new TopicPartition(newTopic(), 0)
+ val latestOffset = Map[TopicPartition, Long]((topic1, 3L), (topic2, 6L))
+
+ // test empty offset.
+ assert(KafkaMicroBatchStream.metrics(Optional.ofNullable(null),
latestOffset) ==
+ Map[String, String]().asJava)
+
+ // test empty offsetsBehindLatest && topics are missing in the
latestConsumedOffset.
+ val emptyOffset = KafkaSourceOffset(Map[TopicPartition, Long]())
+ assert(KafkaMicroBatchStream.metrics(Optional.ofNullable(emptyOffset),
latestOffset) ==
+ Map[String, String]().asJava)
+
+ // test valid offsetsBehindLatest
+ val offset = KafkaSourceOffset(Map[TopicPartition, Long]((topic1, 1L),
(topic2, 2L)))
+ assert(
+ KafkaMicroBatchStream.metrics(Optional.ofNullable(offset), latestOffset)
==
+ Map[String, String](
+ "minOffsetsBehindLatest" -> "2",
+ "maxOffsetsBehindLatest" -> "4",
+ "avgOffsetsBehindLatest" -> "3.0").asJava)
+
+ // test a topic is missing in both the latestConsumedOffset and
latestAvailableOffset.
+ val topic3 = new TopicPartition(newTopic(), 0)
Review comment:
This is not topic but topicPartition, right?
##########
File path:
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
##########
@@ -218,3 +226,37 @@ private[kafka010] class KafkaMicroBatchStream(
}
}
}
+
+object KafkaMicroBatchStream extends Logging {
+
+ /**
+ * Compute the difference of offset per partition between
latestAvailablePartitionOffsets
+ * and latestConsumedPartitionOffsets.
+ * Because of rate limit, latest consumed offset per partition can be
smaller than
+ * the latest available offset per partition.
+ * @param latestConsumedOffset latest consumed offset
+ * @param latestAvailablePartitionOffsets latest available offset per
partition
+ * @return the generated metrics map
+ */
+ def metrics(
+ latestConsumedOffset: Optional[Offset],
+ latestAvailablePartitionOffsets: PartitionOffsetMap): ju.Map[String,
String] = {
+ val offset = Option(latestConsumedOffset.orElse(null))
+
+ if (offset.nonEmpty) {
+ val consumedPartitionOffsets =
offset.map(KafkaSourceOffset(_)).get.partitionToOffsets
Review comment:
Had a deeper look and this maps to `None` and not `Some(null)` so this
is fine.
##########
File path: sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
##########
@@ -138,31 +138,21 @@ class StreamingQueryProgress private[sql](
override def toString: String = prettyJson
private[sql] def jsonValue: JValue = {
- def safeDoubleToJValue(value: Double): JValue = {
- if (value.isNaN || value.isInfinity) JNothing else JDouble(value)
- }
-
- /** Convert map to JValue while handling empty maps. Also, this sorts the
keys. */
- def safeMapToJValue[T](map: ju.Map[String, T], valueToJValue: T =>
JValue): JValue = {
- if (map.isEmpty) return JNothing
- val keys = map.asScala.keySet.toSeq.sorted
- keys.map { k => k -> valueToJValue(map.get(k)) : JObject }.reduce(_ ~ _)
- }
-
("id" -> JString(id.toString)) ~
("runId" -> JString(runId.toString)) ~
("name" -> JString(name)) ~
("timestamp" -> JString(timestamp)) ~
("batchId" -> JInt(batchId)) ~
("numInputRows" -> JInt(numInputRows)) ~
- ("inputRowsPerSecond" -> safeDoubleToJValue(inputRowsPerSecond)) ~
- ("processedRowsPerSecond" -> safeDoubleToJValue(processedRowsPerSecond)) ~
- ("durationMs" -> safeMapToJValue[JLong](durationMs, v => JInt(v.toLong))) ~
- ("eventTime" -> safeMapToJValue[String](eventTime, s => JString(s))) ~
+ ("inputRowsPerSecond" ->
SafeJsonSerializer.safeDoubleToJValue(inputRowsPerSecond)) ~
+ ("processedRowsPerSecond" ->
SafeJsonSerializer.safeDoubleToJValue(processedRowsPerSecond)) ~
+ ("durationMs" -> SafeJsonSerializer.safeMapToJValue[JLong](durationMs, v
=> JInt(v.toLong))) ~
+ ("eventTime" -> SafeJsonSerializer.safeMapToJValue[String](eventTime, s =>
JString(s))) ~
("stateOperators" -> JArray(stateOperators.map(_.jsonValue).toList)) ~
("sources" -> JArray(sources.map(_.jsonValue).toList)) ~
("sink" -> sink.jsonValue) ~
- ("observedMetrics" -> safeMapToJValue[Row](observedMetrics, row =>
row.jsonValue))
+ ("observedMetrics" ->
+ SafeJsonSerializer.safeMapToJValue[Row](observedMetrics, row =>
row.jsonValue))
Review comment:
Same applies here.
##########
File path:
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
##########
@@ -1297,6 +1297,130 @@ class KafkaMicroBatchV2SourceSuite extends
KafkaMicroBatchSourceSuiteBase {
CheckNewAnswer(32, 33, 34, 35, 36)
)
}
+
+
+ test("test custom metrics - with rate limit") {
+ import testImplicits._
+
+ val topic = newTopic()
+ val data = 1 to 10
+ testUtils.createTopic(topic, partitions = 2)
+ testUtils.sendMessages(topic, (1 to 5).map(_.toString).toArray, Some(0))
+ testUtils.sendMessages(topic, (6 to 10).map(_.toString).toArray, Some(1))
+
+ val kafka = spark
+ .readStream
+ .format("kafka")
+ .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+ .option("subscribe", topic)
+ .option("maxOffsetsPerTrigger", 1)
+ .option(STARTING_OFFSETS_OPTION_KEY, "earliest")
+ .load()
+ .selectExpr("CAST(value AS STRING)")
+ .as[String]
+ .map(_.toInt)
+
+ testStream(kafka)(
+ StartStream(),
+ makeSureGetOffsetCalled,
+ CheckAnswer(data: _*),
+ Execute { query => {
+ // The rate limit is 1, so there must be some delay in offsets per
partition.
+ val progressWithDelay =
query.recentProgress.map(_.sources.head).reverse.find {
Review comment:
This can be simplified like:
```
val progressWithDelay =
query.recentProgress.map(_.sources.head).reverse.find { progress =>
// find the metrics that has non-zero average offsetsBehindLatest greater
than 0.
!progress.metrics.isEmpty &&
progress.metrics.get("avgOffsetsBehindLatest").toDouble > 0
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]