[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...

2018-08-30 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21721#discussion_r214231328
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/CustomMetrics.java ---
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2;
+
+import org.apache.spark.annotation.InterfaceStability;
+
+/**
+ * An interface for reporting custom metrics from streaming sources and 
sinks
+ */
+@InterfaceStability.Evolving
--- End diff --

I opened https://github.com/apache/spark/pull/22296 in case we want.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...

2018-08-30 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21721#discussion_r214227481
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/CustomMetrics.java ---
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2;
+
+import org.apache.spark.annotation.InterfaceStability;
+
+/**
+ * An interface for reporting custom metrics from streaming sources and 
sinks
+ */
+@InterfaceStability.Evolving
--- End diff --

Shall we switch this to `Unstable` instead for now?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...

2018-08-07 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21721#discussion_r208109148
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
 ---
@@ -196,6 +237,18 @@ trait ProgressReporter extends Logging {
 currentStatus = currentStatus.copy(isTriggerActive = false)
   }
 
+  /** Extract writer from the executed query plan. */
+  private def dataSourceWriter: Option[DataSourceWriter] = {
+if (lastExecution == null) return None
+lastExecution.executedPlan.collect {
+  case p if p.isInstanceOf[WriteToDataSourceV2Exec] =>
--- End diff --

i see, thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...

2018-08-06 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21721#discussion_r208106031
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
 ---
@@ -196,6 +237,18 @@ trait ProgressReporter extends Logging {
 currentStatus = currentStatus.copy(isTriggerActive = false)
   }
 
+  /** Extract writer from the executed query plan. */
+  private def dataSourceWriter: Option[DataSourceWriter] = {
+if (lastExecution == null) return None
+lastExecution.executedPlan.collect {
+  case p if p.isInstanceOf[WriteToDataSourceV2Exec] =>
--- End diff --

yes, currently the progress is reported only for micro-batch mode. This 
should be supported for continuous mode as well when we start reporting 
progress, but needs some more work - 
https://issues.apache.org/jira/browse/SPARK-23887


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...

2018-08-06 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21721#discussion_r208089043
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
 ---
@@ -196,6 +237,18 @@ trait ProgressReporter extends Logging {
 currentStatus = currentStatus.copy(isTriggerActive = false)
   }
 
+  /** Extract writer from the executed query plan. */
+  private def dataSourceWriter: Option[DataSourceWriter] = {
+if (lastExecution == null) return None
+lastExecution.executedPlan.collect {
+  case p if p.isInstanceOf[WriteToDataSourceV2Exec] =>
--- End diff --

this only works for microbatch mode, do we have a plan to support 
continuous mode?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...

2018-08-06 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/21721


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...

2018-08-02 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21721#discussion_r207232187
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala ---
@@ -163,7 +163,27 @@ class SourceProgress protected[sql](
   val endOffset: String,
   val numInputRows: Long,
   val inputRowsPerSecond: Double,
-  val processedRowsPerSecond: Double) extends Serializable {
+  val processedRowsPerSecond: Double,
+  val customMetrics: String) extends Serializable {
+
+  /** SourceProgress without custom metrics. */
+  def this(
--- End diff --

changed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...

2018-08-01 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21721#discussion_r207090934
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala ---
@@ -163,7 +163,27 @@ class SourceProgress protected[sql](
   val endOffset: String,
   val numInputRows: Long,
   val inputRowsPerSecond: Double,
-  val processedRowsPerSecond: Double) extends Serializable {
+  val processedRowsPerSecond: Double,
+  val customMetrics: String) extends Serializable {
+
+  /** SourceProgress without custom metrics. */
+  def this(
--- End diff --

I would make it `protected[sql] def this(`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...

2018-08-01 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21721#discussion_r207042893
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala ---
@@ -163,7 +163,8 @@ class SourceProgress protected[sql](
   val endOffset: String,
   val numInputRows: Long,
   val inputRowsPerSecond: Double,
-  val processedRowsPerSecond: Double) extends Serializable {
+  val processedRowsPerSecond: Double,
+  val customMetrics: Option[JValue] = None) extends Serializable {
--- End diff --

Refactored to Json String instead of JValue.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...

2018-07-30 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21721#discussion_r206392487
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala ---
@@ -163,7 +163,8 @@ class SourceProgress protected[sql](
   val endOffset: String,
   val numInputRows: Long,
   val inputRowsPerSecond: Double,
-  val processedRowsPerSecond: Double) extends Serializable {
+  val processedRowsPerSecond: Double,
+  val customMetrics: Option[JValue] = None) extends Serializable {
--- End diff --

@HyukjinKwon 
Nice finding. I missed it while reviewing.

Btw, FYI, in #21469 I'm adding new field with default value in 
StateOperatorProgress, like `val customMetrics: ju.Map[String, JLong] = new 
ju.HashMap()` and MiMa doesn't complain.


https://github.com/apache/spark/pull/21469/files#diff-e09301244e3c6b1a69eda6c4bd2ddb52

@arunmahadevan 
Maybe `ju.Map[String, JLong]` will also work here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...

2018-07-30 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21721#discussion_r206374910
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala ---
@@ -163,7 +163,8 @@ class SourceProgress protected[sql](
   val endOffset: String,
   val numInputRows: Long,
   val inputRowsPerSecond: Double,
-  val processedRowsPerSecond: Double) extends Serializable {
+  val processedRowsPerSecond: Double,
+  val customMetrics: Option[JValue] = None) extends Serializable {
--- End diff --

Wait .. this is an exposed API, right? I guess this is exposed to Java API 
too (for instance `query.lastProgress().sources()`)? In that case, we should 
avoid Scala's Option and `org.json4s.*`. If this is supposed to be hidden here, 
I think we should better find a way to hide this with package level access 
modifier.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...

2018-07-30 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21721#discussion_r206374120
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/CustomMetrics.java ---
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2;
+
+import org.apache.spark.annotation.InterfaceStability;
+
+/**
+ * An interface for reporting custom metrics from streaming sources and 
sinks
+ */
+@InterfaceStability.Evolving
+public interface CustomMetrics {
--- End diff --

Java side should also be 2 spaced indented (see "Code Style Guide" in 
https://spark.apache.org/contributing.html)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...

2018-07-30 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21721#discussion_r206373655
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
 ---
@@ -143,18 +151,46 @@ trait ProgressReporter extends Logging {
 }
 logDebug(s"Execution stats: $executionStats")
 
+// extracts custom metrics from readers and writers
+def extractMetrics(getMetrics: () => Option[CustomMetrics],
+  onInvalidMetrics: (Exception) => Unit): 
Option[JValue] = {
--- End diff --

nit:

```scala
def extractMetrics(
getMetrics: () => Option[CustomMetrics],
onInvalidMetrics: (Exception) => Unit): Option[JValue] = {
```



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...

2018-07-30 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21721#discussion_r206373303
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala ---
@@ -163,7 +163,8 @@ class SourceProgress protected[sql](
   val endOffset: String,
   val numInputRows: Long,
   val inputRowsPerSecond: Double,
-  val processedRowsPerSecond: Double) extends Serializable {
+  val processedRowsPerSecond: Double,
+  val customMetrics: Option[JValue] = None) extends Serializable {
--- End diff --

Default value does not work in Java API and probably MiMa complains about 
this. I think another constructor should better be made instead of default 
value to work around this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...

2018-07-30 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21721#discussion_r206349942
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/SupportsCustomWriterMetrics.java
 ---
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.sources.v2.writer.streaming;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.sources.v2.CustomMetrics;
+import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
+
+/**
+ * A mix in interface for {@link DataSourceWriter}. Data source writers 
can implement this
--- End diff --

OK got your intention. I think it makes sense. I'm OK with all three 
options and personally prefer 1 or 2 if the intention is to mix-in, but let's 
see committers' feedback on this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...

2018-07-30 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21721#discussion_r206347147
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/SupportsCustomReaderMetrics.java
 ---
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.sources.v2.reader.streaming;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.sources.v2.CustomMetrics;
+import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
+
+/**
+ * A mix in interface for {@link DataSourceReader}. Data source readers 
can implement this
+ * interface to report custom metrics that gets reported under the
+ * {@link org.apache.spark.sql.streaming.SourceProgress}
+ *
+ */
+@InterfaceStability.Evolving
+public interface SupportsCustomReaderMetrics extends DataSourceReader {
+/**
+ * Returns custom metrics specific to this data source.
+ */
+CustomMetrics getCustomMetrics();
+
+/**
+ * Invoked if the custom metrics returned by {@link 
#getCustomMetrics()} is invalid
--- End diff --

Updated javadoc to explain the same.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...

2018-07-30 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/21721#discussion_r206325719
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/SupportsCustomReaderMetrics.java
 ---
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.sources.v2.reader.streaming;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.sources.v2.CustomMetrics;
+import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
+
+/**
+ * A mix in interface for {@link DataSourceReader}. Data source readers 
can implement this
+ * interface to report custom metrics that gets reported under the
+ * {@link org.apache.spark.sql.streaming.SourceProgress}
+ *
+ */
+@InterfaceStability.Evolving
+public interface SupportsCustomReaderMetrics extends DataSourceReader {
+/**
+ * Returns custom metrics specific to this data source.
+ */
+CustomMetrics getCustomMetrics();
+
+/**
+ * Invoked if the custom metrics returned by {@link 
#getCustomMetrics()} is invalid
--- End diff --

Oh wait, this is the same thing we talked about in the initial round of 
review. I think "throw an error when developing the connector so you can make 
sure your metrics work right" would still be a good example.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...

2018-07-30 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/21721#discussion_r206322694
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/SupportsCustomReaderMetrics.java
 ---
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.sources.v2.reader.streaming;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.sources.v2.CustomMetrics;
+import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
+
+/**
+ * A mix in interface for {@link DataSourceReader}. Data source readers 
can implement this
+ * interface to report custom metrics that gets reported under the
+ * {@link org.apache.spark.sql.streaming.SourceProgress}
+ *
+ */
+@InterfaceStability.Evolving
+public interface SupportsCustomReaderMetrics extends DataSourceReader {
+/**
+ * Returns custom metrics specific to this data source.
+ */
+CustomMetrics getCustomMetrics();
+
+/**
+ * Invoked if the custom metrics returned by {@link 
#getCustomMetrics()} is invalid
--- End diff --

I thought this was a bit convoluted at first, but on reflection I can 
understand why this additional flexibility is valuable. I think it'd be worth 
writing an example here of what a source might want to do other than ignore the 
invalid metrics.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...

2018-07-30 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21721#discussion_r206241038
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/SupportsCustomWriterMetrics.java
 ---
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.sources.v2.writer.streaming;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.sources.v2.CustomMetrics;
+import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
+
+/**
+ * A mix in interface for {@link DataSourceWriter}. Data source writers 
can implement this
--- End diff --

The intention was to restrict the mixin so that it can be applied only to 
`DataSourceReader` and `DataSourceWriter` (similar pattern followed in other 
mixins) by inheriting the appropriate types. Unfortunately theres no common 
ancestor for the mixin to inherit from so I had to duplicate the interface. 
Agree that its not ideal. 

A few options:

1. Have a common ancestor marker interface (say `DataSourceComponent`) 
which is the super type of `DataSourceReader` and `DataSourceWriter`. Then we 
can have a single mixin that is a subtype of that interface. We may encounter 
similar usages for other mixins in future.
2. The mixin does not inherit anything (neither DataSourceReader nor 
DataSourceWriter). Here we cannot impose a restriction on the type of classes 
the mixin can be applied to.
3. Duplicate interfaces (the proposed option in the patch).

I prefer option 1, but would like to proceed based on the feedback.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...

2018-07-30 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21721#discussion_r206237610
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
 ---
@@ -143,18 +150,50 @@ trait ProgressReporter extends Logging {
 }
 logDebug(s"Execution stats: $executionStats")
 
+// extracts custom metrics from readers and writers
+def extractMetrics(getMetrics: () => CustomMetrics,
+  onInvalidMetrics: (Exception) => Unit): 
Option[JValue] = {
+  val metrics = getMetrics()
+  if (metrics != null) {
--- End diff --

Replaced it with `Option` and `map`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...

2018-07-29 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21721#discussion_r206010782
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/SupportsCustomWriterMetrics.java
 ---
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.sources.v2.writer.streaming;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.sources.v2.CustomMetrics;
+import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
+
+/**
+ * A mix in interface for {@link DataSourceWriter}. Data source writers 
can implement this
--- End diff --

If we intend creating a new interface as mix-in, we may not need to create 
individual interfaces for each DataSourceReader and DataSourceWriter. We could 
have only one interface and let DataSourceReader and DataSourceWriter add such 
mix-in interface.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...

2018-07-29 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21721#discussion_r206010370
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
 ---
@@ -143,18 +150,50 @@ trait ProgressReporter extends Logging {
 }
 logDebug(s"Execution stats: $executionStats")
 
+// extracts custom metrics from readers and writers
+def extractMetrics(getMetrics: () => CustomMetrics,
+  onInvalidMetrics: (Exception) => Unit): 
Option[JValue] = {
+  val metrics = getMetrics()
+  if (metrics != null) {
--- End diff --

We could de-indent via `return early`: it would be simpler cause there's 
nothing but returning `None` if metrics is null, and style guide has such case 
as one of exceptional case where return statement is preferred.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...

2018-07-29 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21721#discussion_r206009928
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
 ---
@@ -143,18 +150,50 @@ trait ProgressReporter extends Logging {
 }
 logDebug(s"Execution stats: $executionStats")
 
+// extracts custom metrics from readers and writers
+def extractMetrics(getMetrics: () => CustomMetrics,
+  onInvalidMetrics: (Exception) => Unit): 
Option[JValue] = {
+  val metrics = getMetrics()
+  if (metrics != null) {
+try {
+  Some(parse(metrics.json()))
+} catch {
+  case ex: Exception => onInvalidMetrics(ex)
--- End diff --


https://github.com/databricks/scala-style-guide#exception-handling-try-vs-try

According to the guide, this line needs to be replaced with `case 
NonFatal(e) =>`, and I'd place `onInvalidMetrics` and `None` to same 
indentation.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...

2018-07-10 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/21721#discussion_r201402523
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala
 ---
@@ -95,4 +95,25 @@ private object JsonUtils {
 }
 Serialization.write(result)
   }
+
+  /**
+   * Write per-topic partition lag as json string
--- End diff --

I'd suggest handling the custom metrics for Kafka outside the scope of this 
PR. Maybe we should have a default maxOffsets, but given that we don't I'm 
worried about adding a metric that's misleading in the default case.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...

2018-07-06 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21721#discussion_r200730270
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala
 ---
@@ -95,4 +95,25 @@ private object JsonUtils {
 }
 Serialization.write(result)
   }
+
+  /**
+   * Write per-topic partition lag as json string
--- End diff --

This is the difference between the latest offsets in Kafka the time the 
metrics is reported (just after a micro-batch completes) and the latest offset 
Spark has processed. It can be 0 if spark keeps up with the rate at which 
messages are ingested into Kafka topics (steady state).

I would assume we would always want to set some reasonable micro batch 
sizes by setting `maxOffsetsPerTrigger`. Otherwise spark can end up processing 
entire data in the topics in one micro batch (e.g. if the starting offset is 
set to earliest or the streaming job is stopped for sometime and restarted). 
IMO, we should address this by setting some sane defaults which is currently 
missing.

If we want to handle the custom metrics for Kafka outside the scope of this 
PR I will raise a separate one for this, but this can be really useful to 
identify issues like data skews in some partitions or some other issues causing 
spark to not keep up with the ingestion rate.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...

2018-07-06 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21721#discussion_r200730240
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala ---
@@ -178,12 +180,18 @@ class SourceProgress protected[sql](
   if (value.isNaN || value.isInfinity) JNothing else JDouble(value)
 }
 
-("description" -> JString(description)) ~
+val jsonVal = ("description" -> JString(description)) ~
   ("startOffset" -> tryParse(startOffset)) ~
   ("endOffset" -> tryParse(endOffset)) ~
   ("numInputRows" -> JInt(numInputRows)) ~
   ("inputRowsPerSecond" -> safeDoubleToJValue(inputRowsPerSecond)) ~
   ("processedRowsPerSecond" -> 
safeDoubleToJValue(processedRowsPerSecond))
+
+if (customMetrics != null) {
+  jsonVal ~ ("customMetrics" -> tryParse(customMetrics.json()))
--- End diff --

Currently in case of error, it just reports the JSON string as is (similar 
to start/end offsets). However I agree we can add error reporting to this API. 
Will address.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...

2018-07-05 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/21721#discussion_r200538008
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala
 ---
@@ -95,4 +95,25 @@ private object JsonUtils {
 }
 Serialization.write(result)
   }
+
+  /**
+   * Write per-topic partition lag as json string
--- End diff --

Is "lag" here just the difference (at the time a batch ends) between the 
last offset Spark knows about and the last offset Spark has processed? I'm not 
sure this is super useful to know. If maxOffsets isn't set it's always going to 
be 0, no matter how far Spark gets behind the Kafka cluster.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...

2018-07-05 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/21721#discussion_r200537533
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala
 ---
@@ -379,3 +384,16 @@ private[kafka010] case class 
KafkaMicroBatchInputPartitionReader(
 }
   }
 }
+
+// Currently reports per topic-partition lag.
--- End diff --

nit: javadoc style for top level comments


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...

2018-07-05 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/21721#discussion_r200537276
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsCustomMetrics.java
 ---
@@ -0,0 +1,30 @@
+/*
--- End diff --

This should probably be in v2/reader/streaming.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...

2018-07-05 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/21721#discussion_r200537454
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala ---
@@ -178,12 +180,18 @@ class SourceProgress protected[sql](
   if (value.isNaN || value.isInfinity) JNothing else JDouble(value)
 }
 
-("description" -> JString(description)) ~
+val jsonVal = ("description" -> JString(description)) ~
   ("startOffset" -> tryParse(startOffset)) ~
   ("endOffset" -> tryParse(endOffset)) ~
   ("numInputRows" -> JInt(numInputRows)) ~
   ("inputRowsPerSecond" -> safeDoubleToJValue(inputRowsPerSecond)) ~
   ("processedRowsPerSecond" -> 
safeDoubleToJValue(processedRowsPerSecond))
+
+if (customMetrics != null) {
+  jsonVal ~ ("customMetrics" -> tryParse(customMetrics.json()))
--- End diff --

Is there any way to get an error to the user if their custom metrics fail 
to parse? I'm not entirely sure that's the right thing to do, but I worry that 
it'll be hard to develop against this API if we just silently drop malformed 
metrics.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...

2018-07-05 Thread arunmahadevan
GitHub user arunmahadevan opened a pull request:

https://github.com/apache/spark/pull/21721

[SPARK-24748][SS] Support for reporting custom metrics via StreamingQuery 
Progress

## What changes were proposed in this pull request?

Currently the Structured Streaming sources and sinks does not have a way to 
report custom metrics. Providing an option to report custom metrics and making 
it available via Streaming Query progress can enable sources and sinks to 
report custom progress information (E.g. the lag metrics for Kafka source).

Similar metrics can be reported for Sinks as well, but would like to get 
initial feedback before proceeding further.

## How was this patch tested?

New and existing unit tests.

Please review http://spark.apache.org/contributing.html before opening a 
pull request.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/arunmahadevan/spark SPARK-24748

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21721.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21721


commit b7b2c3b1c9242fe205869f108548248f71ff8203
Author: Arun Mahadevan 
Date:   2018-07-06T01:51:50Z

[SPARK-24748][SS] Support for reporting custom metrics via Streaming Query 
Progress




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org