[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21721#discussion_r214231328 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/CustomMetrics.java --- @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2; + +import org.apache.spark.annotation.InterfaceStability; + +/** + * An interface for reporting custom metrics from streaming sources and sinks + */ +@InterfaceStability.Evolving --- End diff -- I opened https://github.com/apache/spark/pull/22296 in case we want. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21721#discussion_r214227481 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/CustomMetrics.java --- @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2; + +import org.apache.spark.annotation.InterfaceStability; + +/** + * An interface for reporting custom metrics from streaming sources and sinks + */ +@InterfaceStability.Evolving --- End diff -- Shall we switch this to `Unstable` instead for now? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21721#discussion_r208109148 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala --- @@ -196,6 +237,18 @@ trait ProgressReporter extends Logging { currentStatus = currentStatus.copy(isTriggerActive = false) } + /** Extract writer from the executed query plan. */ + private def dataSourceWriter: Option[DataSourceWriter] = { +if (lastExecution == null) return None +lastExecution.executedPlan.collect { + case p if p.isInstanceOf[WriteToDataSourceV2Exec] => --- End diff -- i see, thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/spark/pull/21721#discussion_r208106031 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala --- @@ -196,6 +237,18 @@ trait ProgressReporter extends Logging { currentStatus = currentStatus.copy(isTriggerActive = false) } + /** Extract writer from the executed query plan. */ + private def dataSourceWriter: Option[DataSourceWriter] = { +if (lastExecution == null) return None +lastExecution.executedPlan.collect { + case p if p.isInstanceOf[WriteToDataSourceV2Exec] => --- End diff -- yes, currently the progress is reported only for micro-batch mode. This should be supported for continuous mode as well when we start reporting progress, but needs some more work - https://issues.apache.org/jira/browse/SPARK-23887 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21721#discussion_r208089043 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala --- @@ -196,6 +237,18 @@ trait ProgressReporter extends Logging { currentStatus = currentStatus.copy(isTriggerActive = false) } + /** Extract writer from the executed query plan. */ + private def dataSourceWriter: Option[DataSourceWriter] = { +if (lastExecution == null) return None +lastExecution.executedPlan.collect { + case p if p.isInstanceOf[WriteToDataSourceV2Exec] => --- End diff -- this only works for microbatch mode, do we have a plan to support continuous mode? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/21721 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/spark/pull/21721#discussion_r207232187 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala --- @@ -163,7 +163,27 @@ class SourceProgress protected[sql]( val endOffset: String, val numInputRows: Long, val inputRowsPerSecond: Double, - val processedRowsPerSecond: Double) extends Serializable { + val processedRowsPerSecond: Double, + val customMetrics: String) extends Serializable { + + /** SourceProgress without custom metrics. */ + def this( --- End diff -- changed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21721#discussion_r207090934 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala --- @@ -163,7 +163,27 @@ class SourceProgress protected[sql]( val endOffset: String, val numInputRows: Long, val inputRowsPerSecond: Double, - val processedRowsPerSecond: Double) extends Serializable { + val processedRowsPerSecond: Double, + val customMetrics: String) extends Serializable { + + /** SourceProgress without custom metrics. */ + def this( --- End diff -- I would make it `protected[sql] def this(`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/spark/pull/21721#discussion_r207042893 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala --- @@ -163,7 +163,8 @@ class SourceProgress protected[sql]( val endOffset: String, val numInputRows: Long, val inputRowsPerSecond: Double, - val processedRowsPerSecond: Double) extends Serializable { + val processedRowsPerSecond: Double, + val customMetrics: Option[JValue] = None) extends Serializable { --- End diff -- Refactored to Json String instead of JValue. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21721#discussion_r206392487 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala --- @@ -163,7 +163,8 @@ class SourceProgress protected[sql]( val endOffset: String, val numInputRows: Long, val inputRowsPerSecond: Double, - val processedRowsPerSecond: Double) extends Serializable { + val processedRowsPerSecond: Double, + val customMetrics: Option[JValue] = None) extends Serializable { --- End diff -- @HyukjinKwon Nice finding. I missed it while reviewing. Btw, FYI, in #21469 I'm adding new field with default value in StateOperatorProgress, like `val customMetrics: ju.Map[String, JLong] = new ju.HashMap()` and MiMa doesn't complain. https://github.com/apache/spark/pull/21469/files#diff-e09301244e3c6b1a69eda6c4bd2ddb52 @arunmahadevan Maybe `ju.Map[String, JLong]` will also work here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21721#discussion_r206374910 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala --- @@ -163,7 +163,8 @@ class SourceProgress protected[sql]( val endOffset: String, val numInputRows: Long, val inputRowsPerSecond: Double, - val processedRowsPerSecond: Double) extends Serializable { + val processedRowsPerSecond: Double, + val customMetrics: Option[JValue] = None) extends Serializable { --- End diff -- Wait .. this is an exposed API, right? I guess this is exposed to Java API too (for instance `query.lastProgress().sources()`)? In that case, we should avoid Scala's Option and `org.json4s.*`. If this is supposed to be hidden here, I think we should better find a way to hide this with package level access modifier. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21721#discussion_r206374120 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/CustomMetrics.java --- @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2; + +import org.apache.spark.annotation.InterfaceStability; + +/** + * An interface for reporting custom metrics from streaming sources and sinks + */ +@InterfaceStability.Evolving +public interface CustomMetrics { --- End diff -- Java side should also be 2 spaced indented (see "Code Style Guide" in https://spark.apache.org/contributing.html) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21721#discussion_r206373655 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala --- @@ -143,18 +151,46 @@ trait ProgressReporter extends Logging { } logDebug(s"Execution stats: $executionStats") +// extracts custom metrics from readers and writers +def extractMetrics(getMetrics: () => Option[CustomMetrics], + onInvalidMetrics: (Exception) => Unit): Option[JValue] = { --- End diff -- nit: ```scala def extractMetrics( getMetrics: () => Option[CustomMetrics], onInvalidMetrics: (Exception) => Unit): Option[JValue] = { ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21721#discussion_r206373303 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala --- @@ -163,7 +163,8 @@ class SourceProgress protected[sql]( val endOffset: String, val numInputRows: Long, val inputRowsPerSecond: Double, - val processedRowsPerSecond: Double) extends Serializable { + val processedRowsPerSecond: Double, + val customMetrics: Option[JValue] = None) extends Serializable { --- End diff -- Default value does not work in Java API and probably MiMa complains about this. I think another constructor should better be made instead of default value to work around this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21721#discussion_r206349942 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/SupportsCustomWriterMetrics.java --- @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.sources.v2.writer.streaming; + +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.sources.v2.CustomMetrics; +import org.apache.spark.sql.sources.v2.writer.DataSourceWriter; + +/** + * A mix in interface for {@link DataSourceWriter}. Data source writers can implement this --- End diff -- OK got your intention. I think it makes sense. I'm OK with all three options and personally prefer 1 or 2 if the intention is to mix-in, but let's see committers' feedback on this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/spark/pull/21721#discussion_r206347147 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/SupportsCustomReaderMetrics.java --- @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.sources.v2.reader.streaming; + +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.sources.v2.CustomMetrics; +import org.apache.spark.sql.sources.v2.reader.DataSourceReader; + +/** + * A mix in interface for {@link DataSourceReader}. Data source readers can implement this + * interface to report custom metrics that gets reported under the + * {@link org.apache.spark.sql.streaming.SourceProgress} + * + */ +@InterfaceStability.Evolving +public interface SupportsCustomReaderMetrics extends DataSourceReader { +/** + * Returns custom metrics specific to this data source. + */ +CustomMetrics getCustomMetrics(); + +/** + * Invoked if the custom metrics returned by {@link #getCustomMetrics()} is invalid --- End diff -- Updated javadoc to explain the same. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21721#discussion_r206325719 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/SupportsCustomReaderMetrics.java --- @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.sources.v2.reader.streaming; + +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.sources.v2.CustomMetrics; +import org.apache.spark.sql.sources.v2.reader.DataSourceReader; + +/** + * A mix in interface for {@link DataSourceReader}. Data source readers can implement this + * interface to report custom metrics that gets reported under the + * {@link org.apache.spark.sql.streaming.SourceProgress} + * + */ +@InterfaceStability.Evolving +public interface SupportsCustomReaderMetrics extends DataSourceReader { +/** + * Returns custom metrics specific to this data source. + */ +CustomMetrics getCustomMetrics(); + +/** + * Invoked if the custom metrics returned by {@link #getCustomMetrics()} is invalid --- End diff -- Oh wait, this is the same thing we talked about in the initial round of review. I think "throw an error when developing the connector so you can make sure your metrics work right" would still be a good example. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21721#discussion_r206322694 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/SupportsCustomReaderMetrics.java --- @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.sources.v2.reader.streaming; + +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.sources.v2.CustomMetrics; +import org.apache.spark.sql.sources.v2.reader.DataSourceReader; + +/** + * A mix in interface for {@link DataSourceReader}. Data source readers can implement this + * interface to report custom metrics that gets reported under the + * {@link org.apache.spark.sql.streaming.SourceProgress} + * + */ +@InterfaceStability.Evolving +public interface SupportsCustomReaderMetrics extends DataSourceReader { +/** + * Returns custom metrics specific to this data source. + */ +CustomMetrics getCustomMetrics(); + +/** + * Invoked if the custom metrics returned by {@link #getCustomMetrics()} is invalid --- End diff -- I thought this was a bit convoluted at first, but on reflection I can understand why this additional flexibility is valuable. I think it'd be worth writing an example here of what a source might want to do other than ignore the invalid metrics. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/spark/pull/21721#discussion_r206241038 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/SupportsCustomWriterMetrics.java --- @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.sources.v2.writer.streaming; + +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.sources.v2.CustomMetrics; +import org.apache.spark.sql.sources.v2.writer.DataSourceWriter; + +/** + * A mix in interface for {@link DataSourceWriter}. Data source writers can implement this --- End diff -- The intention was to restrict the mixin so that it can be applied only to `DataSourceReader` and `DataSourceWriter` (similar pattern followed in other mixins) by inheriting the appropriate types. Unfortunately theres no common ancestor for the mixin to inherit from so I had to duplicate the interface. Agree that its not ideal. A few options: 1. Have a common ancestor marker interface (say `DataSourceComponent`) which is the super type of `DataSourceReader` and `DataSourceWriter`. Then we can have a single mixin that is a subtype of that interface. We may encounter similar usages for other mixins in future. 2. The mixin does not inherit anything (neither DataSourceReader nor DataSourceWriter). Here we cannot impose a restriction on the type of classes the mixin can be applied to. 3. Duplicate interfaces (the proposed option in the patch). I prefer option 1, but would like to proceed based on the feedback. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/spark/pull/21721#discussion_r206237610 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala --- @@ -143,18 +150,50 @@ trait ProgressReporter extends Logging { } logDebug(s"Execution stats: $executionStats") +// extracts custom metrics from readers and writers +def extractMetrics(getMetrics: () => CustomMetrics, + onInvalidMetrics: (Exception) => Unit): Option[JValue] = { + val metrics = getMetrics() + if (metrics != null) { --- End diff -- Replaced it with `Option` and `map` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21721#discussion_r206010782 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/SupportsCustomWriterMetrics.java --- @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.sources.v2.writer.streaming; + +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.sources.v2.CustomMetrics; +import org.apache.spark.sql.sources.v2.writer.DataSourceWriter; + +/** + * A mix in interface for {@link DataSourceWriter}. Data source writers can implement this --- End diff -- If we intend creating a new interface as mix-in, we may not need to create individual interfaces for each DataSourceReader and DataSourceWriter. We could have only one interface and let DataSourceReader and DataSourceWriter add such mix-in interface. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21721#discussion_r206010370 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala --- @@ -143,18 +150,50 @@ trait ProgressReporter extends Logging { } logDebug(s"Execution stats: $executionStats") +// extracts custom metrics from readers and writers +def extractMetrics(getMetrics: () => CustomMetrics, + onInvalidMetrics: (Exception) => Unit): Option[JValue] = { + val metrics = getMetrics() + if (metrics != null) { --- End diff -- We could de-indent via `return early`: it would be simpler cause there's nothing but returning `None` if metrics is null, and style guide has such case as one of exceptional case where return statement is preferred. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21721#discussion_r206009928 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala --- @@ -143,18 +150,50 @@ trait ProgressReporter extends Logging { } logDebug(s"Execution stats: $executionStats") +// extracts custom metrics from readers and writers +def extractMetrics(getMetrics: () => CustomMetrics, + onInvalidMetrics: (Exception) => Unit): Option[JValue] = { + val metrics = getMetrics() + if (metrics != null) { +try { + Some(parse(metrics.json())) +} catch { + case ex: Exception => onInvalidMetrics(ex) --- End diff -- https://github.com/databricks/scala-style-guide#exception-handling-try-vs-try According to the guide, this line needs to be replaced with `case NonFatal(e) =>`, and I'd place `onInvalidMetrics` and `None` to same indentation. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21721#discussion_r201402523 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala --- @@ -95,4 +95,25 @@ private object JsonUtils { } Serialization.write(result) } + + /** + * Write per-topic partition lag as json string --- End diff -- I'd suggest handling the custom metrics for Kafka outside the scope of this PR. Maybe we should have a default maxOffsets, but given that we don't I'm worried about adding a metric that's misleading in the default case. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/spark/pull/21721#discussion_r200730270 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala --- @@ -95,4 +95,25 @@ private object JsonUtils { } Serialization.write(result) } + + /** + * Write per-topic partition lag as json string --- End diff -- This is the difference between the latest offsets in Kafka the time the metrics is reported (just after a micro-batch completes) and the latest offset Spark has processed. It can be 0 if spark keeps up with the rate at which messages are ingested into Kafka topics (steady state). I would assume we would always want to set some reasonable micro batch sizes by setting `maxOffsetsPerTrigger`. Otherwise spark can end up processing entire data in the topics in one micro batch (e.g. if the starting offset is set to earliest or the streaming job is stopped for sometime and restarted). IMO, we should address this by setting some sane defaults which is currently missing. If we want to handle the custom metrics for Kafka outside the scope of this PR I will raise a separate one for this, but this can be really useful to identify issues like data skews in some partitions or some other issues causing spark to not keep up with the ingestion rate. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/spark/pull/21721#discussion_r200730240 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala --- @@ -178,12 +180,18 @@ class SourceProgress protected[sql]( if (value.isNaN || value.isInfinity) JNothing else JDouble(value) } -("description" -> JString(description)) ~ +val jsonVal = ("description" -> JString(description)) ~ ("startOffset" -> tryParse(startOffset)) ~ ("endOffset" -> tryParse(endOffset)) ~ ("numInputRows" -> JInt(numInputRows)) ~ ("inputRowsPerSecond" -> safeDoubleToJValue(inputRowsPerSecond)) ~ ("processedRowsPerSecond" -> safeDoubleToJValue(processedRowsPerSecond)) + +if (customMetrics != null) { + jsonVal ~ ("customMetrics" -> tryParse(customMetrics.json())) --- End diff -- Currently in case of error, it just reports the JSON string as is (similar to start/end offsets). However I agree we can add error reporting to this API. Will address. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21721#discussion_r200538008 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala --- @@ -95,4 +95,25 @@ private object JsonUtils { } Serialization.write(result) } + + /** + * Write per-topic partition lag as json string --- End diff -- Is "lag" here just the difference (at the time a batch ends) between the last offset Spark knows about and the last offset Spark has processed? I'm not sure this is super useful to know. If maxOffsets isn't set it's always going to be 0, no matter how far Spark gets behind the Kafka cluster. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21721#discussion_r200537533 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala --- @@ -379,3 +384,16 @@ private[kafka010] case class KafkaMicroBatchInputPartitionReader( } } } + +// Currently reports per topic-partition lag. --- End diff -- nit: javadoc style for top level comments --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21721#discussion_r200537276 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsCustomMetrics.java --- @@ -0,0 +1,30 @@ +/* --- End diff -- This should probably be in v2/reader/streaming. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21721#discussion_r200537454 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala --- @@ -178,12 +180,18 @@ class SourceProgress protected[sql]( if (value.isNaN || value.isInfinity) JNothing else JDouble(value) } -("description" -> JString(description)) ~ +val jsonVal = ("description" -> JString(description)) ~ ("startOffset" -> tryParse(startOffset)) ~ ("endOffset" -> tryParse(endOffset)) ~ ("numInputRows" -> JInt(numInputRows)) ~ ("inputRowsPerSecond" -> safeDoubleToJValue(inputRowsPerSecond)) ~ ("processedRowsPerSecond" -> safeDoubleToJValue(processedRowsPerSecond)) + +if (customMetrics != null) { + jsonVal ~ ("customMetrics" -> tryParse(customMetrics.json())) --- End diff -- Is there any way to get an error to the user if their custom metrics fail to parse? I'm not entirely sure that's the right thing to do, but I worry that it'll be hard to develop against this API if we just silently drop malformed metrics. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...
GitHub user arunmahadevan opened a pull request: https://github.com/apache/spark/pull/21721 [SPARK-24748][SS] Support for reporting custom metrics via StreamingQuery Progress ## What changes were proposed in this pull request? Currently the Structured Streaming sources and sinks does not have a way to report custom metrics. Providing an option to report custom metrics and making it available via Streaming Query progress can enable sources and sinks to report custom progress information (E.g. the lag metrics for Kafka source). Similar metrics can be reported for Sinks as well, but would like to get initial feedback before proceeding further. ## How was this patch tested? New and existing unit tests. Please review http://spark.apache.org/contributing.html before opening a pull request. You can merge this pull request into a Git repository by running: $ git pull https://github.com/arunmahadevan/spark SPARK-24748 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21721.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21721 commit b7b2c3b1c9242fe205869f108548248f71ff8203 Author: Arun Mahadevan Date: 2018-07-06T01:51:50Z [SPARK-24748][SS] Support for reporting custom metrics via Streaming Query Progress --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org