[GitHub] spark issue #21919: [SPARK-24933][SS] Report numOutputRows in SinkProgress
Github user vackosar commented on the issue: https://github.com/apache/spark/pull/21919 @tdas, @gatorsmile and @cloud-fan, just resolved conflicts. Are you happy to merge or any suggestions? Please respond such that I can either merge or close this PR. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21919: [SPARK-24933][SS] Report numOutputRows in SinkProgress
Github user vackosar commented on the issue: https://github.com/apache/spark/pull/21919 @tdas, @gatorsmile and @cloud-fan, just resolved conflicts. Are you happy to merge or any suggestions? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21919: [SPARK-24933][SS] Report numOutputRows in SinkProgress
Github user vackosar commented on the issue: https://github.com/apache/spark/pull/21919 @cloud-fan apart from conflicts are you happy to merge? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22143: [SPARK-24647][SS] Report KafkaStreamWriter's written min...
Github user vackosar commented on the issue: https://github.com/apache/spark/pull/22143 @cloud-fan are you ok merging the PR? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22143: [SPARK-24647][SS] Report KafkaStreamWriter's writ...
Github user vackosar commented on a diff in the pull request: https://github.com/apache/spark/pull/22143#discussion_r211381706 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamWriter.scala --- @@ -116,3 +133,66 @@ class KafkaStreamDataWriter( } } } + +private[kafka010] case class KafkaWriterCustomMetrics( +minOffset: KafkaSourceOffset, +maxOffset: KafkaSourceOffset) extends CustomMetrics { + override def json(): String = { +val jsonVal = ("minOffset" -> parse(minOffset.json)) ~ + ("maxOffset" -> parse(maxOffset.json)) +compact(render(jsonVal)) + } + + override def toString: String = json() +} + +private[kafka010] object KafkaWriterCustomMetrics { + + import Math.{min, max} + + def apply(messages: Array[WriterCommitMessage]): KafkaWriterCustomMetrics = { +val minMax = collate(messages) +KafkaWriterCustomMetrics(minMax._1, minMax._2) + } + + private def collate(messages: Array[WriterCommitMessage]): --- End diff -- Thanks, I will rename to something with minMax. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22143: [SPARK-24647][SS] Report KafkaStreamWriter's written min...
Github user vackosar commented on the issue: https://github.com/apache/spark/pull/22143 @arunmahadevan min and max are used there can be other writers to same topic occurring in different job. The messages sent would then become interleaved and one would have to return large number of intervals to be accurate. This approach gives sufficient information where the data ended up being written, while being also resilient and simplistic. Would you recommend adding this as a Java Doc? To explain montivation I updated description of this PR using description of the Jira. (To track data lineage we need to know where data was read from and written to at least approaximately.) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22143: [SPARK-24647][SS] Report KafkaStreamWriter's writ...
Github user vackosar commented on a diff in the pull request: https://github.com/apache/spark/pull/22143#discussion_r211379697 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamWriter.scala --- @@ -19,18 +19,23 @@ package org.apache.spark.sql.kafka010 import scala.collection.JavaConverters._ +import org.json4s.JsonDSL._ +import org.json4s.jackson.JsonMethods._ + import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.kafka010.KafkaWriter.validateQuery +import org.apache.spark.sql.sources.v2.CustomMetrics import org.apache.spark.sql.sources.v2.writer._ -import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter +import org.apache.spark.sql.sources.v2.writer.streaming.{StreamWriter, SupportsCustomWriterMetrics} import org.apache.spark.sql.types.StructType /** * Dummy commit message. The DataSourceV2 framework requires a commit message implementation but we * don't need to really send one. */ -case object KafkaWriterCommitMessage extends WriterCommitMessage +case class KafkaWriterCommitMessage(minOffset: KafkaSourceOffset, maxOffset: KafkaSourceOffset) --- End diff -- I would have to rename the class itself to not add additional duplicate class. I would love to do that, it is just that I am not sure if it would be accepted. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22143: [SPARK-24647][SS] Report KafkaStreamWriter's writ...
Github user vackosar commented on a diff in the pull request: https://github.com/apache/spark/pull/22143#discussion_r211379432 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala --- @@ -61,12 +63,30 @@ private[kafka010] class KafkaWriteTask( private[kafka010] abstract class KafkaRowWriter( inputSchema: Seq[Attribute], topic: Option[String]) { + import scala.collection.JavaConverters._ + + protected val minOffsetAccumulator: collection.concurrent.Map[TopicPartition, Long] = +new ConcurrentHashMap[TopicPartition, Long]().asScala --- End diff -- This map is accessed in callbacks concurrently with respect to different partitions. Can be seen from call hierarchy and docs of Kafka's send method. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22143: [SPARK-24647][SS] Report KafkaStreamWriter's written min...
Github user vackosar commented on the issue: https://github.com/apache/spark/pull/22143 @arunmahadevan @jose-torres @cloud-fan you may interested in this one. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22143: [SPARK-24647][SS] Report KafkaStreamWriter's writ...
GitHub user vackosar opened a pull request: https://github.com/apache/spark/pull/22143 [SPARK-24647][SS] Report KafkaStreamWriter's written min and max offs⦠â¦ets via CustomMetrics. ## What changes were proposed in this pull request? Report KafkaStreamWriter's written min and max offsets via CustomMetrics. This is important for data lineage projects like Spline. Related issue: https://issues.apache.org/jira/browse/SPARK-24647 ## How was this patch tested? Unit tests. Please review http://spark.apache.org/contributing.html before opening a pull request. You can merge this pull request into a Git repository by running: $ git pull https://github.com/AbsaOSS/spark feature/SPARK-24647-kafka-writer-offsets Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22143.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22143 commit 767a0156a1acef515974d48bbcb6cfdb17f68d90 Author: Kosar, Vaclav: Functions Transformation Date: 2018-08-17T13:31:57Z [SPARK-24647][SS] Report KafkaStreamWriter's written min and max offsets via CustomMetrics. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21919: [SPARK-24933][SS] Report numOutputRows in SinkPro...
Github user vackosar commented on a diff in the pull request: https://github.com/apache/spark/pull/21919#discussion_r210708107 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala --- @@ -254,3 +259,10 @@ class SinkProgress protected[sql]( } } } + +private[sql] object SinkProgress { + val DEFAULT_NUM_OUTPUT_ROWS: Long = -1L --- End diff -- I will implement this for continuous streaming and then only legacy sinks would output -1. I didn't wanted to change the API too often. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21919: [SPARK-24933][SS] Report numOutputRows in SinkProgress
Github user vackosar commented on the issue: https://github.com/apache/spark/pull/21919 @cloud-fan happy to merge? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21919: [SPARK-24933][SS] Report numOutputRows in SinkProgress
Github user vackosar commented on the issue: https://github.com/apache/spark/pull/21919 @jose-torres are you happy to merge? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21919: [SPARK-24933][SS] Report numOutputRows in SinkPro...
Github user vackosar commented on a diff in the pull request: https://github.com/apache/spark/pull/21919#discussion_r209621895 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala --- @@ -198,11 +198,14 @@ class SourceProgress protected[sql]( * during a trigger. See [[StreamingQueryProgress]] for more information. * * @param description Description of the source corresponding to this status. + * @param numOutputRows Number of rows written to the sink or -1 for Continuous Mode (temporarily) + * or Sink V1 (until decommissioned). * @since 2.1.0 */ @InterfaceStability.Evolving class SinkProgress protected[sql]( -val description: String) extends Serializable { + val description: String, + val numOutputRows: Long) extends Serializable { --- End diff -- @zsxwing, what do you think? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21919: [SPARK-24933][SS] Report numOutputRows in SinkPro...
Github user vackosar commented on a diff in the pull request: https://github.com/apache/spark/pull/21919#discussion_r208902692 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala --- @@ -213,6 +216,12 @@ class SinkProgress protected[sql]( override def toString: String = prettyJson private[sql] def jsonValue: JValue = { -("description" -> JString(description)) +("description" -> JString(description)) ~ + ("numOutputRows" -> JInt(numOutputRows)) } } + +object SinkProgress { + def apply(description: String, numOutputRows: Option[Long]): SinkProgress = + new SinkProgress(description, numOutputRows.getOrElse(-1L)) --- End diff -- It would be good to impl indentation rules into the scalastyle-config.xml. Not sure how, but could look into that in future. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21919: [SPARK-24933][SS] Report numOutputRows in SinkPro...
Github user vackosar commented on a diff in the pull request: https://github.com/apache/spark/pull/21919#discussion_r208902157 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala --- @@ -198,11 +198,14 @@ class SourceProgress protected[sql]( * during a trigger. See [[StreamingQueryProgress]] for more information. * * @param description Description of the source corresponding to this status. + * @param numOutputRows Number of rows written to the sink or -1 for Continuous Mode (temporarily) + * or Sink V1 (until decommissioned). * @since 2.1.0 */ @InterfaceStability.Evolving class SinkProgress protected[sql]( -val description: String) extends Serializable { + val description: String, + val numOutputRows: Long) extends Serializable { --- End diff -- numOutputRows is used here to be similar to SourceProgress#numInputRows. I agree that I could change from SinkProgress#numOutputRows to SinkProgress#numInputRows as it may make bit more sense from what actually happens. Would you and others be in favor of that? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21919: [SPARK-24933][SS] Report numOutputRows in SinkPro...
Github user vackosar commented on a diff in the pull request: https://github.com/apache/spark/pull/21919#discussion_r208901343 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala --- @@ -58,6 +61,7 @@ case class WriteToDataSourceV2Exec(writer: DataSourceWriter, query: SparkPlan) e val useCommitCoordinator = writer.useCommitCoordinator val rdd = query.execute() val messages = new Array[WriterCommitMessage](rdd.partitions.length) +val totalNumRowsAccumulator = new LongAccumulator() --- End diff -- I am using here threadsafe java native LongAccumulator. Spark cluster accumulator is not needed here and would be also a bit out of the ordinary in the code base area. Note that StreamingQueryProgress is reported as an event object. That is why I am not using the cluster accumulator here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21919: [SPARK-24933][SS] Report numOutputRows in SinkProgress
Github user vackosar commented on the issue: https://github.com/apache/spark/pull/21919 @jose-torres @cloud-fan do you have any other structure and functionality suggestions for the PR now? Or can I focus on finalizing the work and getting it merged? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21919: [SPARK-24933][SS] Report numOutputRows in SinkPro...
Github user vackosar commented on a diff in the pull request: https://github.com/apache/spark/pull/21919#discussion_r208107058 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala --- @@ -46,6 +46,9 @@ case class WriteToDataSourceV2(writer: DataSourceWriter, query: LogicalPlan) ext * The physical plan for writing data into data source v2. */ case class WriteToDataSourceV2Exec(writer: DataSourceWriter, query: SparkPlan) extends SparkPlan { --- End diff -- At the moment it is I think overkill. Maybe something to consider for future though as long as we wont find any use for those functionalities we will be adding. The one added now, may would have some use. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21919: [SPARK-24933][SS] Report numOutputRows in SinkPro...
Github user vackosar commented on a diff in the pull request: https://github.com/apache/spark/pull/21919#discussion_r207991171 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala --- @@ -179,3 +192,24 @@ class InternalRowDataWriter(rowWriter: DataWriter[Row], encoder: ExpressionEncod override def abort(): Unit = rowWriter.abort() } + +/** + * Collects commit progress on writers. +*/ +trait StreamWriterProgressCollector { --- End diff -- You are right. I havent realized that I can access WriteToDataSourceV2Exec directly. Please have a look now --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21919: [SPARK-24933][SS] Report numOutputRows in SinkProgress
Github user vackosar commented on the issue: https://github.com/apache/spark/pull/21919 @jose-torres @zsxwing I will exclude SinkProgress constructor from binary compatibility check as this object is constructed internally by Spark. That will remove current MiMa test failure. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21919: [SPARK-24933][SS] Report numOutputRows in SinkPro...
Github user vackosar commented on a diff in the pull request: https://github.com/apache/spark/pull/21919#discussion_r207716505 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamWriterCommitProgress.java --- @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.sources.v2.writer.streaming; + +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage; + +import java.io.Serializable; + +/** + * Sink progress information collected from {@link WriterCommitMessage}. + */ +@InterfaceStability.Evolving +public interface StreamWriterCommitProgress extends Serializable { --- End diff -- I reduced it to private case class now. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21919: [SPARK-24933][SS] Report numOutputRows in SinkProgress
Github user vackosar commented on the issue: https://github.com/apache/spark/pull/21919 @jose-torres I removed use of commit to report the row count. Would you have a look? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21919: [SPARK-24933][SS] Report numOutputRows in SinkProgress v...
Github user vackosar commented on the issue: https://github.com/apache/spark/pull/21919 @jose-torres I haven't thought about this. Let me investigate bit more. Shall we return to this PR? Do you agree with extending WriterCommitMessage and using in DataWritingSparkTask#run to return row count instead of current implementation? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21919: [SPARK-24933][SS] Report numOutputRows in SinkProgress v...
Github user vackosar commented on the issue: https://github.com/apache/spark/pull/21919 Yes, I was hoping to improve that eg using filename as offset or other non consumer-owned approach, but that would be rather long term. Do you think it is solvable? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21919: [SPARK-24933][SS] Report numOutputRows in SinkProgress v...
Github user vackosar commented on the issue: https://github.com/apache/spark/pull/21919 @jose-torres why it wouldnt make sense? According to the documentation all SS sources have offsets, but not all sinks can also be SS sources e.g. ForEach doesnt have offsets in general. So usually the offsets should be available on the Sinks, no? Your expert feedback on this is very appreciated! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21919: [SPARK-24933][SS] Report numOutputRows in SinkProgress v...
Github user vackosar commented on the issue: https://github.com/apache/spark/pull/21919 @jose-torres thx for good point. The reason for placing this into WriterCommitMessage is to set a standard information that should passed at the commit time. But I agree that row counting specifically could be moved to e.g. DataWritingSparkTask#run by adding some extension of WriterCommitMessage. There will however be metrics which wont be possible to move there for example Minimum and Maximum Offset written [SPARK-24647](https://issues.apache.org/jira/browse/SPARK-24647) Do you agree with extending WriterCommitMessage and using in DataWritingSparkTask to return row count? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21919: [SPARK-24933][SS] Report numOutputRows in SinkProgress v...
Github user vackosar commented on the issue: https://github.com/apache/spark/pull/21919 @tdas @zsxwing @jose-torres @jerryshao @arunmahadevan @HyukjinKwon, please help with the review and merge. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21919: [SPARK-24933][SS] Report numOutputRows in SinkPro...
GitHub user vackosar opened a pull request: https://github.com/apache/spark/pull/21919 [SPARK-24933][SS] Report numOutputRows in SinkProgress via WrittenCom⦠## What changes were proposed in this pull request? SinkProgress should report similar properties like SourceProgress as long as they are available for given Sink. Count of written rows is metric availble for all Sinks. Since relevant progress information is with respect to commited rows, ideal object to carry this info is WriterCommitMessage. For brevity the implementation will focus only on Sinks with API V2 and on Micro Batch mode. Implemention for Continuous mode will be provided at later date. ### Before ``` {"description":"org.apache.spark.sql.kafka010.KafkaSourceProvider@3c0bd317"} ``` ### After ``` {"description":"org.apache.spark.sql.kafka010.KafkaSourceProvider@3c0bd317","numOutputRows":5000} ``` ### This PR is related to: - https://issues.apache.org/jira/browse/SPARK-24647 - https://issues.apache.org/jira/browse/SPARK-21313 ## How was this patch tested? Existing and new unit tests. Please review http://spark.apache.org/contributing.html before opening a pull request. You can merge this pull request into a Git repository by running: $ git pull https://github.com/AbsaOSS/spark feature/SPARK-24933-numOutputRows Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21919.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21919 commit 8c513e4765691bc5e31914066b05091f90096f88 Author: Kosar, Vaclav: Functions Transformation Date: 2018-06-26T09:41:25Z [SPARK-24933][SS] Report numOutputRows in SinkProgress via WrittenCommitMessage. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org