[GitHub] spark pull request #22143: [SPARK-24647][SS] Report KafkaStreamWriter's writ...

2018-08-20 Thread vackosar
Github user vackosar commented on a diff in the pull request:

https://github.com/apache/spark/pull/22143#discussion_r211381706
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamWriter.scala
 ---
@@ -116,3 +133,66 @@ class KafkaStreamDataWriter(
 }
   }
 }
+
+private[kafka010] case class KafkaWriterCustomMetrics(
+minOffset: KafkaSourceOffset,
+maxOffset: KafkaSourceOffset) extends CustomMetrics {
+  override def json(): String = {
+val jsonVal = ("minOffset" -> parse(minOffset.json)) ~
+  ("maxOffset" -> parse(maxOffset.json))
+compact(render(jsonVal))
+  }
+
+  override def toString: String = json()
+}
+
+private[kafka010] object KafkaWriterCustomMetrics {
+
+  import Math.{min, max}
+
+  def apply(messages: Array[WriterCommitMessage]): 
KafkaWriterCustomMetrics = {
+val minMax = collate(messages)
+KafkaWriterCustomMetrics(minMax._1, minMax._2)
+  }
+
+  private def collate(messages: Array[WriterCommitMessage]):
--- End diff --

Thanks, I will rename to something with minMax.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22143: [SPARK-24647][SS] Report KafkaStreamWriter's writ...

2018-08-20 Thread vackosar
Github user vackosar commented on a diff in the pull request:

https://github.com/apache/spark/pull/22143#discussion_r211379697
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamWriter.scala
 ---
@@ -19,18 +19,23 @@ package org.apache.spark.sql.kafka010
 
 import scala.collection.JavaConverters._
 
+import org.json4s.JsonDSL._
+import org.json4s.jackson.JsonMethods._
+
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.kafka010.KafkaWriter.validateQuery
+import org.apache.spark.sql.sources.v2.CustomMetrics
 import org.apache.spark.sql.sources.v2.writer._
-import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
+import org.apache.spark.sql.sources.v2.writer.streaming.{StreamWriter, 
SupportsCustomWriterMetrics}
 import org.apache.spark.sql.types.StructType
 
 /**
  * Dummy commit message. The DataSourceV2 framework requires a commit 
message implementation but we
  * don't need to really send one.
  */
-case object KafkaWriterCommitMessage extends WriterCommitMessage
+case class KafkaWriterCommitMessage(minOffset: KafkaSourceOffset, 
maxOffset: KafkaSourceOffset)
--- End diff --

I would have to rename the class itself to not add additional duplicate 
class. I would love to do that, it is just that I am not sure if it would be 
accepted.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22143: [SPARK-24647][SS] Report KafkaStreamWriter's writ...

2018-08-20 Thread vackosar
Github user vackosar commented on a diff in the pull request:

https://github.com/apache/spark/pull/22143#discussion_r211379432
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala
 ---
@@ -61,12 +63,30 @@ private[kafka010] class KafkaWriteTask(
 private[kafka010] abstract class KafkaRowWriter(
 inputSchema: Seq[Attribute], topic: Option[String]) {
 
+  import scala.collection.JavaConverters._
+
+  protected val minOffsetAccumulator: 
collection.concurrent.Map[TopicPartition, Long] =
+new ConcurrentHashMap[TopicPartition, Long]().asScala
--- End diff --

This map is accessed in callbacks concurrently with respect to different 
partitions. Can be seen from call hierarchy and docs of Kafka's send method.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22143: [SPARK-24647][SS] Report KafkaStreamWriter's writ...

2018-08-20 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22143#discussion_r211339535
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala
 ---
@@ -61,12 +63,30 @@ private[kafka010] class KafkaWriteTask(
 private[kafka010] abstract class KafkaRowWriter(
 inputSchema: Seq[Attribute], topic: Option[String]) {
 
+  import scala.collection.JavaConverters._
+
+  protected val minOffsetAccumulator: 
collection.concurrent.Map[TopicPartition, Long] =
+new ConcurrentHashMap[TopicPartition, Long]().asScala
--- End diff --

why is this concurrent map?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22143: [SPARK-24647][SS] Report KafkaStreamWriter's writ...

2018-08-20 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22143#discussion_r211336988
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamWriter.scala
 ---
@@ -116,3 +133,66 @@ class KafkaStreamDataWriter(
 }
   }
 }
+
+private[kafka010] case class KafkaWriterCustomMetrics(
+minOffset: KafkaSourceOffset,
+maxOffset: KafkaSourceOffset) extends CustomMetrics {
+  override def json(): String = {
+val jsonVal = ("minOffset" -> parse(minOffset.json)) ~
+  ("maxOffset" -> parse(maxOffset.json))
+compact(render(jsonVal))
+  }
+
+  override def toString: String = json()
+}
+
+private[kafka010] object KafkaWriterCustomMetrics {
+
+  import Math.{min, max}
+
+  def apply(messages: Array[WriterCommitMessage]): 
KafkaWriterCustomMetrics = {
+val minMax = collate(messages)
+KafkaWriterCustomMetrics(minMax._1, minMax._2)
+  }
+
+  private def collate(messages: Array[WriterCommitMessage]):
--- End diff --

good to leave some comment on what this does. It seems to be computing the 
min/max offset per partition? If so choosing an apt name for that function 
would make it clearer.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22143: [SPARK-24647][SS] Report KafkaStreamWriter's writ...

2018-08-20 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22143#discussion_r211336368
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamWriter.scala
 ---
@@ -19,18 +19,23 @@ package org.apache.spark.sql.kafka010
 
 import scala.collection.JavaConverters._
 
+import org.json4s.JsonDSL._
+import org.json4s.jackson.JsonMethods._
+
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.kafka010.KafkaWriter.validateQuery
+import org.apache.spark.sql.sources.v2.CustomMetrics
 import org.apache.spark.sql.sources.v2.writer._
-import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
+import org.apache.spark.sql.sources.v2.writer.streaming.{StreamWriter, 
SupportsCustomWriterMetrics}
 import org.apache.spark.sql.types.StructType
 
 /**
  * Dummy commit message. The DataSourceV2 framework requires a commit 
message implementation but we
  * don't need to really send one.
  */
-case object KafkaWriterCommitMessage extends WriterCommitMessage
+case class KafkaWriterCommitMessage(minOffset: KafkaSourceOffset, 
maxOffset: KafkaSourceOffset)
--- End diff --

Its kind of odd that the writer commit message includes source offset. IMO, 
better to define a `KafkaSinkOffset` or if it can be common, something like 
`KafkaOffsets`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22143: [SPARK-24647][SS] Report KafkaStreamWriter's writ...

2018-08-19 Thread vackosar
GitHub user vackosar opened a pull request:

https://github.com/apache/spark/pull/22143

[SPARK-24647][SS] Report KafkaStreamWriter's written min and max offs…

…ets via CustomMetrics.

## What changes were proposed in this pull request?

Report KafkaStreamWriter's written min and max offsets via CustomMetrics. 
This is important for data lineage projects like Spline. Related issue: 
https://issues.apache.org/jira/browse/SPARK-24647

## How was this patch tested?

Unit tests.

Please review http://spark.apache.org/contributing.html before opening a 
pull request.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/AbsaOSS/spark 
feature/SPARK-24647-kafka-writer-offsets

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22143.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22143


commit 767a0156a1acef515974d48bbcb6cfdb17f68d90
Author: Kosar, Vaclav: Functions Transformation 
Date:   2018-08-17T13:31:57Z

[SPARK-24647][SS] Report KafkaStreamWriter's written min and max offsets 
via CustomMetrics.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org