Github user vackosar commented on a diff in the pull request:
https://github.com/apache/spark/pull/22143#discussion_r211381706
--- Diff:
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamWriter.scala
---
@@ -116,3 +133,66 @@ class KafkaStreamDataWriter(
}
}
}
+
+private[kafka010] case class KafkaWriterCustomMetrics(
+ minOffset: KafkaSourceOffset,
+ maxOffset: KafkaSourceOffset) extends CustomMetrics {
+ override def json(): String = {
+ val jsonVal = ("minOffset" -> parse(minOffset.json)) ~
+ ("maxOffset" -> parse(maxOffset.json))
+ compact(render(jsonVal))
+ }
+
+ override def toString: String = json()
+}
+
+private[kafka010] object KafkaWriterCustomMetrics {
+
+ import Math.{min, max}
+
+ def apply(messages: Array[WriterCommitMessage]):
KafkaWriterCustomMetrics = {
+ val minMax = collate(messages)
+ KafkaWriterCustomMetrics(minMax._1, minMax._2)
+ }
+
+ private def collate(messages: Array[WriterCommitMessage]):
--- End diff --
Thanks, I will rename to something with minMax.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]