[GitHub] spark issue #23156: [SPARK-24063][SS] Add maximum epoch queue threshold for ...

2018-12-06 Thread arunmahadevan
Github user arunmahadevan commented on the issue:

https://github.com/apache/spark/pull/23156
  
@gaborgsomogyi what I meant was rather than exposing a config to control 
the internal queue sizes, we could have a higher level config like the max 
pending epochs. This would act as a back pressure mechanism to stop further 
processing until the pending epochs are committed. I assume this would also put 
a limit on the three queues.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23156: [SPARK-24063][SS] Add maximum epoch queue threshold for ...

2018-12-05 Thread arunmahadevan
Github user arunmahadevan commented on the issue:

https://github.com/apache/spark/pull/23156
  
Rather than controlling the queue sizes it would be better to limit the max 
epoch backlog and fail the query once that threshold is reached.  There already 
seems to be patch that attempted to address this 
https://github.com/apache/spark/pull/21392


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22598: [SPARK-25501][SS] Add kafka delegation token support.

2018-11-06 Thread arunmahadevan
Github user arunmahadevan commented on the issue:

https://github.com/apache/spark/pull/22598
  
+1, LGTM.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-10-26 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r228681974
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
 ---
@@ -66,7 +66,8 @@ private[spark] class HadoopDelegationTokenManager(
   private def getDelegationTokenProviders: Map[String, 
HadoopDelegationTokenProvider] = {
 val providers = Seq(new HadoopFSDelegationTokenProvider(fileSystems)) 
++
   safeCreateProvider(new HiveDelegationTokenProvider) ++
-  safeCreateProvider(new HBaseDelegationTokenProvider)
+  safeCreateProvider(new HBaseDelegationTokenProvider) ++
+  safeCreateProvider(new KafkaDelegationTokenProvider)
--- End diff --

yes, I think the best we can do is to document the configs and throw some 
useful error messages to make the user aware of the "bootstrapservers" config 
(in case they accidently left it) when the spark-sql-kafka libraries are not in 
the classpath.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-10-26 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r228671263
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
 ---
@@ -66,7 +66,8 @@ private[spark] class HadoopDelegationTokenManager(
   private def getDelegationTokenProviders: Map[String, 
HadoopDelegationTokenProvider] = {
 val providers = Seq(new HadoopFSDelegationTokenProvider(fileSystems)) 
++
   safeCreateProvider(new HiveDelegationTokenProvider) ++
-  safeCreateProvider(new HBaseDelegationTokenProvider)
+  safeCreateProvider(new HBaseDelegationTokenProvider) ++
+  safeCreateProvider(new KafkaDelegationTokenProvider)
--- End diff --

Why I thought disabling by default might make sense - 

The tokens fetch would be attempted if just "spark.kafka.bootstrap.servers" 
is defined. And if this config is set the spark-sql-kafka libraries needs to be 
in the class path as well. Better mention these in the docs. 

We could also consider prefixing all the configs with 
spark.security.credentials.kafka instead of spark.kafka (like 
spark.security.credentials.kafka.bootstrap.servers) to make it explicit that 
these are security related settings required for fetching kafka delegation 
tokens.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22824: [SPARK-25834] [Structured Streaming]Update Mode should n...

2018-10-26 Thread arunmahadevan
Github user arunmahadevan commented on the issue:

https://github.com/apache/spark/pull/22824
  
I mean its easy to miss if a new "case" is added and "update" mode is not 
supported. Even now how about LeftSemi, LeftAnti, FullOuter etc?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-10-26 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r228583252
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/security/KafkaDelegationTokenProvider.scala
 ---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.security
+
+import scala.reflect.runtime.universe
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.security.Credentials
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.{KAFKA_BOOTSTRAP_SERVERS, 
KAFKA_SECURITY_PROTOCOL}
+import org.apache.spark.util.Utils
+
+private[security] class KafkaDelegationTokenProvider
+  extends HadoopDelegationTokenProvider with Logging {
+
+  override def serviceName: String = "kafka"
+
+  override def obtainDelegationTokens(
+  hadoopConf: Configuration,
+  sparkConf: SparkConf,
+  creds: Credentials): Option[Long] = {
+try {
+  val mirror = 
universe.runtimeMirror(Utils.getContextOrSparkClassLoader)
+  val obtainToken = mirror.classLoader.
+loadClass("org.apache.spark.sql.kafka010.TokenUtil").
+getMethod("obtainToken", classOf[SparkConf])
--- End diff --

spark-core does not have a dependency on spark-sql-kafka so this is needed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-10-25 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r228339774
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/security/KafkaDelegationTokenProvider.scala
 ---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.security
+
+import scala.reflect.runtime.universe
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.security.Credentials
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.{KAFKA_BOOTSTRAP_SERVERS, 
KAFKA_SECURITY_PROTOCOL}
+import org.apache.spark.util.Utils
+
+private[security] class KafkaDelegationTokenProvider
+  extends HadoopDelegationTokenProvider with Logging {
+
+  override def serviceName: String = "kafka"
+
+  override def obtainDelegationTokens(
+  hadoopConf: Configuration,
+  sparkConf: SparkConf,
+  creds: Credentials): Option[Long] = {
+try {
+  val mirror = 
universe.runtimeMirror(Utils.getContextOrSparkClassLoader)
+  val obtainToken = mirror.classLoader.
+loadClass("org.apache.spark.sql.kafka010.TokenUtil").
+getMethod("obtainToken", classOf[SparkConf])
+
+  logDebug("Attempting to fetch Kafka security token.")
+  val token = obtainToken.invoke(null, sparkConf)
+.asInstanceOf[Token[_ <: TokenIdentifier]]
+  creds.addToken(token.getService, token)
+} catch {
+  case NonFatal(e) =>
+logInfo(s"Failed to get token from service $serviceName", e)
+}
+
+None
--- End diff --

Shouldn't this return the time of the next renewal? Otherwise how does the 
token manager know when should it be renewed or recreated ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-10-25 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r228321793
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
 ---
@@ -66,7 +66,8 @@ private[spark] class HadoopDelegationTokenManager(
   private def getDelegationTokenProviders: Map[String, 
HadoopDelegationTokenProvider] = {
 val providers = Seq(new HadoopFSDelegationTokenProvider(fileSystems)) 
++
   safeCreateProvider(new HiveDelegationTokenProvider) ++
-  safeCreateProvider(new HBaseDelegationTokenProvider)
+  safeCreateProvider(new HBaseDelegationTokenProvider) ++
+  safeCreateProvider(new KafkaDelegationTokenProvider)
--- End diff --

Update the class docs of which providers are loaded by default or better 
set the default for `spark.security.credentials.kafka.enabled` to false. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-10-25 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r228320944
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -647,4 +647,42 @@ package object config {
   .stringConf
   .toSequence
   .createWithDefault(Nil)
+
+  private[spark] val KAFKA_DELEGATION_TOKEN_ENABLED =
+ConfigBuilder("spark.kafka.delegation.token.enabled")
+  .doc("Set to 'true' for obtaining delegation token from kafka.")
+  .booleanConf
+  .createWithDefault(false)
+
+  private[spark] val KAFKA_BOOTSTRAP_SERVERS =
+ConfigBuilder("spark.kafka.bootstrap.servers")
--- End diff --

Given the defaults the tokens fetch would be attempted if only 
`spark.kafka.bootstrap.servers` is defined right ? And the spark-sql-kafka 
libraries needs to be in the class path as well ? 

Better mention these in the docs. And make the 
`spark.security.credentials.kafka.enabled` default to false if it makes sense.

Also consider prefixing all the configs with 
`spark.security.credentials.kafka` instead of `spark.kafka`  (like 
`spark.security.credentials.kafka.bootstrap.servers`) to make it explicit that 
these are security related settings required for fetching kafka delegation 
tokens.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22824: [SPARK-25834] [Structured Streaming]Update Mode should n...

2018-10-25 Thread arunmahadevan
Github user arunmahadevan commented on the issue:

https://github.com/apache/spark/pull/22824
  
It may be enough to do the check just once than repeating similar checks 
for inner, leftOuter and rightOuter. For example have a single check before the 
`joinType match {` clause.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22482: WIP - [SPARK-10816][SS] Support session window natively

2018-09-20 Thread arunmahadevan
Github user arunmahadevan commented on the issue:

https://github.com/apache/spark/pull/22482
  
+1 for the idea to provide native session window support.

On the approach, it would be ideal if all windowing aggregations can be 
handled via single plan and state store (v/s the separate plan and state store 
the patch proposes for session window). Underlying steps are more or less the 
same for Fixed, Session and Sliding windows. The sort/merge operations have to 
be part of a window merge function rather than the plan itself.

K,Values -> AssignWindows (produces [k, v, timestamp, window]) -> 
GroupByKey  (shuffle) -> MergeWindows (optional step) -> GroupWindows -> 
aggregate values.

Based on how we want to approach it, it could be handled now or as a follow 
up item (with major refactoring).



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22299: [SPARK-24748][SS][FOLLOWUP] Switch custom metrics...

2018-09-05 Thread arunmahadevan
Github user arunmahadevan closed the pull request at:

https://github.com/apache/spark/pull/22299


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21721: [SPARK-24748][SS] Support for reporting custom metrics v...

2018-08-31 Thread arunmahadevan
Github user arunmahadevan commented on the issue:

https://github.com/apache/spark/pull/21721
  
>It seems like its life cycle should be bound to an epoch, but 
unfortunately we don't have such an interface in continuous streaming to 
represent an epoch. Is it possible that we may end up with 2 sets of custom 
metrics APIs for micro-batch and continuous?

@cloud-fan we could still report progress at the end of each epoch (e.g. 
[here](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala#L231)
 and via the EpochCordinator). There need not be separate interfaces for the 
progress or the custom metrics, just the mechanisms could be different.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21721: [SPARK-24748][SS] Support for reporting custom metrics v...

2018-08-31 Thread arunmahadevan
Github user arunmahadevan commented on the issue:

https://github.com/apache/spark/pull/21721
  
I created a follow up PR to move CustomMetrics (and a few other streaming 
specific interfaces in that package) to 'streaming' and mark the interfaces as 
Unstable here - https://github.com/apache/spark/pull/22299


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22299: [SPARK-24748][SS][FOLLOWUP] Switch custom metrics...

2018-08-31 Thread arunmahadevan
GitHub user arunmahadevan opened a pull request:

https://github.com/apache/spark/pull/22299

[SPARK-24748][SS][FOLLOWUP] Switch custom metrics to Unstable APIs

- Mark custom metrics related APIs as unstable
- Move CustomMetrics (and a few other streaming interfaces in parent 
package) to streaming package

Ideally could move `v2/reader/streaming` and `v2/writer/streaming` under 
`streaming/reader` and `streaming/writer` but that can be a follow up PR if 
required.

## How was this patch tested?
Existing unit tests

Please review http://spark.apache.org/contributing.html before opening a 
pull request.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/arunmahadevan/spark refactor

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22299.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22299


commit 49a94c6016a0a4cd6076329797f4c2ac5a9cb588
Author: Arun Mahadevan 
Date:   2018-08-31T05:53:57Z

[SPARK-24748][SS][FOLLOWUP] Switch custom metrics to Unstable APIs

- Mark custom metrics related APIs as unstable
- Move streaming related interfaces to streaming package




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21721: [SPARK-24748][SS] Support for reporting custom metrics v...

2018-08-30 Thread arunmahadevan
Github user arunmahadevan commented on the issue:

https://github.com/apache/spark/pull/21721
  
@rxin its for streaming sources and sinks as explained in the [doc](

https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/sources/v2/CustomMetrics.java#L23)

It had to be shared between classes in reader.streaming and 
writer.streaming, so was added in the parent package (similar to other 
streaming specific classes that exists here like 
[StreamingWriteSupportProvider.java 
](https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/sources/v2/StreamingWriteSupportProvider.java)

[MicroBatchReadSupportProvider.java](https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/sources/v2/MicroBatchReadSupportProvider.java))

we could move all of it to a streaming package.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21721: [SPARK-24748][SS] Support for reporting custom metrics v...

2018-08-30 Thread arunmahadevan
Github user arunmahadevan commented on the issue:

https://github.com/apache/spark/pull/21721
  
@HyukjinKwon yes we can mark it unstable. Like I mentioned multiple times 
in previous comments the traits added here like CustomMetrics, 
SupportsCustomReaderMetrics etc have nothing specific to micro batch or 
continuous mode and un-affected when we finally start reporting progress for 
continuous mode. The way to collect and report metrics in continuous mode needs 
to be figured out and I think should be discussed in respective JIRAs.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21721: [SPARK-24748][SS] Support for reporting custom metrics v...

2018-08-30 Thread arunmahadevan
Github user arunmahadevan commented on the issue:

https://github.com/apache/spark/pull/21721
  
There are many unknowns to be figured out for continuous mode. Though the 
way to capture the metrics would be different for continuous execution, the 
interface of whats reported is not expected to change. Given that we already 
report progress for micro-batch and as a user of spark the changes in the patch 
are quite useful to report custom metrics for what works right now and since it 
does not impact other parts of DataSourceV2 apis (only the sources that wants 
to report custom metrics would add the traits) IMO, we can keep this and 
continue to investigate in a time-bound manner of how to capture metrics for 
continuous mode.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21721: [SPARK-24748][SS] Support for reporting custom metrics v...

2018-08-30 Thread arunmahadevan
Github user arunmahadevan commented on the issue:

https://github.com/apache/spark/pull/21721
  
The CustomMetrics are traits which can be mixed in if necessary. (see 
https://github.com/apache/spark/pull/21721#issuecomment-403878383) and does not 
affect any other API as such. When query progress is supported for continuous 
mode, changes can be made if necessary. I can further investigate on how to 
report query progress for continuous mode.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21721: [SPARK-24748][SS] Support for reporting custom metrics v...

2018-08-30 Thread arunmahadevan
Github user arunmahadevan commented on the issue:

https://github.com/apache/spark/pull/21721
  
@zsxwing @gatorsmile , this PR does not add new APIs as such. It builds on 
the existing StreamingQueryProgress and adds custom metrics to it. 
StreamingQueryProgress as such is not reported for continuous mode. When its 
reported this would be part of that. Are you proposing to not report 
StreamingQueryProgress at all for micro-batch until things are figured out for 
continuous mode.? Otherwise I don't see how adding this would break things. 

cc @jose-torres 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21721: [SPARK-24748][SS] Support for reporting custom metrics v...

2018-08-30 Thread arunmahadevan
Github user arunmahadevan commented on the issue:

https://github.com/apache/spark/pull/21721
  
For continuous queries, the progress could still be reported by posting 
QueryProgressEvent to the listener for each epoch (instead of micro-batch). The 
`StreamingQueryProgress` also could mostly be the same.

I am not clear on how the continuous query would get the metric updates. We 
may need some mechanism to post metric updates while the query continues to 
run. Right now the SQL metrics relies on accumulator and the accumulators might 
not be updated unless the task completes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22251: [SPARK-25260][SQL] Fix namespace handling in Sche...

2018-08-28 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22251#discussion_r213407599
  
--- Diff: 
external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala ---
@@ -1099,6 +1098,27 @@ class AvroSuite extends QueryTest with 
SharedSQLContext with SQLTestUtils {
 }
   }
 
+  test("check namespace - toAvroType") {
+val sparkSchema = StructType(Seq(
+  StructField("name", StringType, nullable = false),
+  StructField("address", StructType(Seq(
+StructField("city", StringType, nullable = false),
+StructField("state", StringType, nullable = false))),
+nullable = false)))
+val employeeType = SchemaConverters.toAvroType(sparkSchema,
+  recordName = "employee",
+  nameSpace = "foo.bar")
--- End diff --

Added a test case for toAvroType with empty namespace


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22251: [SPARK-25260][SQL] Fix namespace handling in Sche...

2018-08-28 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22251#discussion_r213407441
  
--- Diff: 
external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala ---
@@ -1099,6 +1098,27 @@ class AvroSuite extends QueryTest with 
SharedSQLContext with SQLTestUtils {
 }
   }
 
+  test("check namespace - toAvroType") {
--- End diff --

Its sort of covered in the below existing cases. Do you think we need more?

[Validate namespace in avro file that has nested records with the same 
name](https://github.com/apache/spark/blob/master/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala#L1078)
[conversion to avro and back with 
namespace](https://github.com/apache/spark/blob/master/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala#L510)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22251: [SPARK-25260][SQL] Fix namespace handling in SchemaConve...

2018-08-28 Thread arunmahadevan
Github user arunmahadevan commented on the issue:

https://github.com/apache/spark/pull/22251
  
cc @gengliangwang @dongjoon-hyun


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22251: [SPARK-25260][SQL] Fix namespace handling in Sche...

2018-08-28 Thread arunmahadevan
GitHub user arunmahadevan opened a pull request:

https://github.com/apache/spark/pull/22251

[SPARK-25260][SQL] Fix namespace handling in SchemaConverters.toAvroType

## What changes were proposed in this pull request?

`toAvroType` converts spark data type to avro schema. It always appends the 
record name to namespace so its impossible to have an Avro namespace 
independent of the record name.

 
When invoked with a spark data type like,

```java
val sparkSchema = StructType(Seq(
StructField("name", StringType, nullable = false),
StructField("address", StructType(Seq(
StructField("city", StringType, nullable = false),
StructField("state", StringType, nullable = false))),
nullable = false)))
 
// map it to an avro schema with record name "employee" and top level 
namespace "foo.bar",
val avroSchema = SchemaConverters.toAvroType(sparkSchema,  false, 
"employee", "foo.bar")

// result is
// avroSchema.getName = employee
// avroSchema.getNamespace = foo.bar.employee
// avroSchema.getFullname = foo.bar.employee.employee
```
The patch proposes to fix this so that the result is

```
avroSchema.getName = employee
avroSchema.getNamespace = foo.bar
avroSchema.getFullname = foo.bar.employee
```
## How was this patch tested?

New and existing unit tests.

Please review http://spark.apache.org/contributing.html before opening a 
pull request.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/arunmahadevan/spark avro-fix

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22251.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22251


commit f47483951e12d563b7696940a2cfc2fdc3b27ab2
Author: Arun Mahadevan 
Date:   2018-08-28T08:00:17Z

[SPARK-25260][SQL] Fix namespace handling in SchemaConverters.toAvroType




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22238: [SPARK-25245][DOCS][SS] Explain regarding limitin...

2018-08-27 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22238#discussion_r213049895
  
--- Diff: docs/structured-streaming-programming-guide.md ---
@@ -2812,6 +2812,12 @@ See [Input Sources](#input-sources) and [Output 
Sinks](#output-sinks) sections f
 
 # Additional Information
 
+**Gotchas**
--- End diff --

IMO, It would be better to keep it here as well as in the code, we may not 
be able to surface it in the right api docs and chance for users to ignore it.

@HeartSaVioR, may be add an example here to illustrate how to use the 
coalesce?



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22121: [SPARK-25133][SQL][Doc]Avro data source guide

2018-08-22 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22121#discussion_r212031015
  
--- Diff: docs/avro-data-source-guide.md ---
@@ -0,0 +1,377 @@
+---
+layout: global
+title: Apache Avro Data Source Guide
+---
+
+* This will become a table of contents (this text will be scraped).
+{:toc}
+
+Since Spark 2.4 release, [Spark 
SQL](https://spark.apache.org/docs/latest/sql-programming-guide.html) provides 
built-in support for reading and writing Apache Avro data.
+
+## Deploying
+The `spark-avro` module is external and not included in `spark-submit` or 
`spark-shell` by default.
+
+As with any Spark applications, `spark-submit` is used to launch your 
application. `spark-avro_{{site.SCALA_BINARY_VERSION}}`
+and its dependencies can be directly added to `spark-submit` using 
`--packages`, such as,
+
+./bin/spark-submit --packages 
org.apache.spark:spark-avro_{{site.SCALA_BINARY_VERSION}}:{{site.SPARK_VERSION_SHORT}}
 ...
+
+For experimenting on `spark-shell`, you can also use `--packages` to add 
`org.apache.spark:spark-avro_{{site.SCALA_BINARY_VERSION}}` and its 
dependencies directly,
+
+./bin/spark-shell --packages 
org.apache.spark:spark-avro_{{site.SCALA_BINARY_VERSION}}:{{site.SPARK_VERSION_SHORT}}
 ...
+
+See [Application Submission Guide](submitting-applications.html) for more 
details about submitting applications with external dependencies.
+
+## Load and Save Functions
+
+Since `spark-avro` module is external, there is no `.avro` API in 
+`DataFrameReader` or `DataFrameWriter`.
+
+To load/save data in Avro format, you need to specify the data source 
option `format` as `avro`(or `org.apache.spark.sql.avro`).
+
+
+{% highlight scala %}
+
+val usersDF = 
spark.read.format("avro").load("examples/src/main/resources/users.avro")
+usersDF.select("name", 
"favorite_color").write.format("avro").save("namesAndFavColors.avro")
+
+{% endhighlight %}
+
+
+{% highlight java %}
+
+Dataset usersDF = 
spark.read().format("avro").load("examples/src/main/resources/users.avro");
+usersDF.select("name", 
"favorite_color").write().format("avro").save("namesAndFavColors.avro");
+
+{% endhighlight %}
+
+
+{% highlight python %}
+
+df = 
spark.read.format("avro").load("examples/src/main/resources/users.avro")
+df.select("name", 
"favorite_color").write.format("avro").save("namesAndFavColors.avro")
+
+{% endhighlight %}
+
+
+{% highlight r %}
+
+df <- read.df("examples/src/main/resources/users.avro", "avro")
+write.df(select(df, "name", "favorite_color"), "namesAndFavColors.avro", 
"avro")
+
+{% endhighlight %}
+
+
+
+## to_avro() and from_avro()
+Spark SQL provides function `to_avro` to encode a struct as a string and 
`from_avro()` to retrieve the struct as a complex type.
--- End diff --

does it need to be a struct or any spark sql type? 
maybe: `to_avro` to encode spark sql types as avro bytes and `from_avro` to 
retrieve avro bytes as spark sql types?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22143: [SPARK-24647][SS] Report KafkaStreamWriter's writ...

2018-08-20 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22143#discussion_r211339535
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala
 ---
@@ -61,12 +63,30 @@ private[kafka010] class KafkaWriteTask(
 private[kafka010] abstract class KafkaRowWriter(
 inputSchema: Seq[Attribute], topic: Option[String]) {
 
+  import scala.collection.JavaConverters._
+
+  protected val minOffsetAccumulator: 
collection.concurrent.Map[TopicPartition, Long] =
+new ConcurrentHashMap[TopicPartition, Long]().asScala
--- End diff --

why is this concurrent map?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22143: [SPARK-24647][SS] Report KafkaStreamWriter's writ...

2018-08-20 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22143#discussion_r211336988
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamWriter.scala
 ---
@@ -116,3 +133,66 @@ class KafkaStreamDataWriter(
 }
   }
 }
+
+private[kafka010] case class KafkaWriterCustomMetrics(
+minOffset: KafkaSourceOffset,
+maxOffset: KafkaSourceOffset) extends CustomMetrics {
+  override def json(): String = {
+val jsonVal = ("minOffset" -> parse(minOffset.json)) ~
+  ("maxOffset" -> parse(maxOffset.json))
+compact(render(jsonVal))
+  }
+
+  override def toString: String = json()
+}
+
+private[kafka010] object KafkaWriterCustomMetrics {
+
+  import Math.{min, max}
+
+  def apply(messages: Array[WriterCommitMessage]): 
KafkaWriterCustomMetrics = {
+val minMax = collate(messages)
+KafkaWriterCustomMetrics(minMax._1, minMax._2)
+  }
+
+  private def collate(messages: Array[WriterCommitMessage]):
--- End diff --

good to leave some comment on what this does. It seems to be computing the 
min/max offset per partition? If so choosing an apt name for that function 
would make it clearer.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22143: [SPARK-24647][SS] Report KafkaStreamWriter's writ...

2018-08-20 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22143#discussion_r211336368
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamWriter.scala
 ---
@@ -19,18 +19,23 @@ package org.apache.spark.sql.kafka010
 
 import scala.collection.JavaConverters._
 
+import org.json4s.JsonDSL._
+import org.json4s.jackson.JsonMethods._
+
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.kafka010.KafkaWriter.validateQuery
+import org.apache.spark.sql.sources.v2.CustomMetrics
 import org.apache.spark.sql.sources.v2.writer._
-import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
+import org.apache.spark.sql.sources.v2.writer.streaming.{StreamWriter, 
SupportsCustomWriterMetrics}
 import org.apache.spark.sql.types.StructType
 
 /**
  * Dummy commit message. The DataSourceV2 framework requires a commit 
message implementation but we
  * don't need to really send one.
  */
-case object KafkaWriterCommitMessage extends WriterCommitMessage
+case class KafkaWriterCommitMessage(minOffset: KafkaSourceOffset, 
maxOffset: KafkaSourceOffset)
--- End diff --

Its kind of odd that the writer commit message includes source offset. IMO, 
better to define a `KafkaSinkOffset` or if it can be common, something like 
`KafkaOffsets`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21919: [SPARK-24933][SS] Report numOutputRows in SinkProgress

2018-08-16 Thread arunmahadevan
Github user arunmahadevan commented on the issue:

https://github.com/apache/spark/pull/21919
  
LGTM overall except one minor comment.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21919: [SPARK-24933][SS] Report numOutputRows in SinkPro...

2018-08-16 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21919#discussion_r210707152
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala ---
@@ -254,3 +259,10 @@ class SinkProgress protected[sql](
 }
   }
 }
+
+private[sql] object SinkProgress {
+  val DEFAULT_NUM_OUTPUT_ROWS: Long = -1L
--- End diff --

Does it result in sink progress output with "numOutputRows = -1" ? Maybe 
add numOutputRows to the output only if the value is not default.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21819: [SPARK-24863][SS] Report Kafka offset lag as a custom me...

2018-08-16 Thread arunmahadevan
Github user arunmahadevan commented on the issue:

https://github.com/apache/spark/pull/21819
  
@HyukjinKwon , can you take it forward? Appreciate your effort and thanks 
in advance. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21819: [SPARK-24863][SS] Report Kafka offset lag as a cu...

2018-08-13 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21819#discussion_r209699459
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala
 ---
@@ -95,4 +95,20 @@ private object JsonUtils {
 }
 Serialization.write(result)
   }
+
+  /**
+   * Write per-topic partition lag as json string
+   */
+  def partitionLags(latestOffsets: Map[TopicPartition, Long],
+processedOffsets: Map[TopicPartition, Long]): String = 
{
+val result = new HashMap[String, HashMap[Int, Long]]()
--- End diff --

Had followed the style in other parts of the class. Addressed and 
refactored the other places as well.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21819: [SPARK-24863][SS] Report Kafka offset lag as a cu...

2018-08-13 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21819#discussion_r209699010
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala
 ---
@@ -95,4 +95,20 @@ private object JsonUtils {
 }
 Serialization.write(result)
   }
+
+  /**
+   * Write per-topic partition lag as json string
+   */
+  def partitionLags(latestOffsets: Map[TopicPartition, Long],
+processedOffsets: Map[TopicPartition, Long]): String = 
{
--- End diff --

addressed. would it be possible to add this to scala style checks ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21819: [SPARK-24863][SS] Report Kafka offset lag as a custom me...

2018-08-10 Thread arunmahadevan
Github user arunmahadevan commented on the issue:

https://github.com/apache/spark/pull/21819
  
@HeartSaVioR @HyukjinKwon @jose-torres @tdas  would you mind taking a look?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21199: [SPARK-24127][SS] Continuous text socket source

2018-08-07 Thread arunmahadevan
Github user arunmahadevan commented on the issue:

https://github.com/apache/spark/pull/21199
  
@HyukjinKwon this has been open for a while, would you mind taking this 
forward?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...

2018-08-06 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21721#discussion_r208106031
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
 ---
@@ -196,6 +237,18 @@ trait ProgressReporter extends Logging {
 currentStatus = currentStatus.copy(isTriggerActive = false)
   }
 
+  /** Extract writer from the executed query plan. */
+  private def dataSourceWriter: Option[DataSourceWriter] = {
+if (lastExecution == null) return None
+lastExecution.executedPlan.collect {
+  case p if p.isInstanceOf[WriteToDataSourceV2Exec] =>
--- End diff --

yes, currently the progress is reported only for micro-batch mode. This 
should be supported for continuous mode as well when we start reporting 
progress, but needs some more work - 
https://issues.apache.org/jira/browse/SPARK-23887


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21199: [SPARK-24127][SS] Continuous text socket source

2018-08-06 Thread arunmahadevan
Github user arunmahadevan commented on the issue:

https://github.com/apache/spark/pull/21199
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21721: [SPARK-24748][SS] Support for reporting custom metrics v...

2018-08-06 Thread arunmahadevan
Github user arunmahadevan commented on the issue:

https://github.com/apache/spark/pull/21721
  
@HyukjinKwon , the master code changed and I had to rebase and fix issues. 
Can you take it forward ? There seems to be unrelated test failures in Kafka 
0.10 integration suite.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21199: [SPARK-24127][SS] Continuous text socket source

2018-08-03 Thread arunmahadevan
Github user arunmahadevan commented on the issue:

https://github.com/apache/spark/pull/21199
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21721: [SPARK-24748][SS] Support for reporting custom metrics v...

2018-08-03 Thread arunmahadevan
Github user arunmahadevan commented on the issue:

https://github.com/apache/spark/pull/21721
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21919: [SPARK-24933][SS] Report numOutputRows in SinkProgress v...

2018-08-02 Thread arunmahadevan
Github user arunmahadevan commented on the issue:

https://github.com/apache/spark/pull/21919
  
`numOutputRows` makes sense for all sinks, but I agree the counting should 
be done at the framework and not by individual sinks. For metrics that does not 
apply to all sinks, they could report it as some custom metrics if they want 
to. Heres a proposal to add collect and report custom metrics for sources and 
sinks  - https://github.com/apache/spark/pull/21721


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21721: [SPARK-24748][SS] Support for reporting custom metrics v...

2018-08-02 Thread arunmahadevan
Github user arunmahadevan commented on the issue:

https://github.com/apache/spark/pull/21721
  
The tests keeps failing and looks unrelated. @HyukjinKwon Let me know if 
you think theres something I should look into.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21199: [SPARK-24127][SS] Continuous text socket source

2018-08-02 Thread arunmahadevan
Github user arunmahadevan commented on the issue:

https://github.com/apache/spark/pull/21199
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21199: [SPARK-24127][SS] Continuous text socket source

2018-08-02 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21199#discussion_r207237894
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTextSocketSource.scala
 ---
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.io.{BufferedReader, InputStreamReader, IOException}
+import java.net.Socket
+import java.sql.Timestamp
+import java.text.SimpleDateFormat
+import java.util.{Calendar, List => JList, Locale}
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+import org.json4s.{DefaultFormats, NoTypeHints}
+import org.json4s.jackson.Serialization
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.rpc.RpcEndpointRef
+import org.apache.spark.sql._
+import org.apache.spark.sql.execution.streaming.{ContinuousRecordEndpoint, 
ContinuousRecordPartitionOffset, GetRecord}
+import org.apache.spark.sql.sources.v2.DataSourceOptions
+import org.apache.spark.sql.sources.v2.reader.{InputPartition, 
InputPartitionReader, SupportsDeprecatedScanRow}
+import 
org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousInputPartitionReader,
 ContinuousReader, Offset, PartitionOffset}
+import org.apache.spark.sql.types.{StringType, StructField, StructType, 
TimestampType}
+import org.apache.spark.util.RpcUtils
+
+
+object TextSocketContinuousReader {
+  val SCHEMA_REGULAR = StructType(StructField("value", StringType) :: Nil)
+  val SCHEMA_TIMESTAMP = StructType(
+StructField("value", StringType)
+  :: StructField("timestamp", TimestampType) :: Nil)
+  val DATE_FORMAT = new SimpleDateFormat("-MM-dd HH:mm:ss", Locale.US)
+}
+
+/**
+ * A ContinuousReader that reads text lines through a TCP socket, designed 
only for tutorials and
+ * debugging. This ContinuousReader will *not* work in production 
applications due to multiple
+ * reasons, including no support for fault recovery.
+ *
+ * The driver maintains a socket connection to the host-port, keeps the 
received messages in
+ * buckets and serves the messages to the executors via a RPC endpoint.
+ */
+class TextSocketContinuousReader(options: DataSourceOptions) extends 
ContinuousReader
+  with SupportsDeprecatedScanRow with Logging {
+  implicit val defaultFormats: DefaultFormats = DefaultFormats
+
+  private val host: String = options.get("host").get()
+  private val port: Int = options.get("port").get().toInt
+
+  assert(SparkSession.getActiveSession.isDefined)
+  private val spark = SparkSession.getActiveSession.get
+  private val numPartitions = spark.sparkContext.defaultParallelism
+
+  @GuardedBy("this")
+  private var socket: Socket = _
+
+  @GuardedBy("this")
+  private var readThread: Thread = _
+
+  @GuardedBy("this")
+  private val buckets = Seq.fill(numPartitions)(new ListBuffer[(String, 
Timestamp)])
+
+  @GuardedBy("this")
+  private var currentOffset: Int = -1
+
+  private var startOffset: TextSocketOffset = _
+
+  private val recordEndpoint = new ContinuousRecordEndpoint(buckets, this)
+  @volatile private var endpointRef: RpcEndpointRef = _
+
+  initialize()
+
+  override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = {
+assert(offsets.length == numPartitions)
+val offs = offsets
+  .map(_.asInstanceOf[ContinuousRecordPartitionOffset])
+  .sortBy(_.partitionId)
+  .map(_.offset)
+  .toList
+TextSocketOffset(offs)
+  }
+
+  override def deserializeOffset(json: String): Offset = {
+TextSocketOffset(Seria

[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...

2018-08-02 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21721#discussion_r207232187
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala ---
@@ -163,7 +163,27 @@ class SourceProgress protected[sql](
   val endOffset: String,
   val numInputRows: Long,
   val inputRowsPerSecond: Double,
-  val processedRowsPerSecond: Double) extends Serializable {
+  val processedRowsPerSecond: Double,
+  val customMetrics: String) extends Serializable {
+
+  /** SourceProgress without custom metrics. */
+  def this(
--- End diff --

changed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21199: [SPARK-24127][SS] Continuous text socket source

2018-08-01 Thread arunmahadevan
Github user arunmahadevan commented on the issue:

https://github.com/apache/spark/pull/21199
  
@HeartSaVioR , Addressed your comments. Let me know if I missed something. 
Also rebased and had to change more code to use the new interfaces. 

I hope if we can speed up the review cycles in general than leaving PRs to 
hibernation for a while and then the developer will loose the context and other 
things would have changed in the meanwhile.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21199: [SPARK-24127][SS] Continuous text socket source

2018-08-01 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21199#discussion_r207078242
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTextSocketSource.scala
 ---
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.io.{BufferedReader, InputStreamReader, IOException}
+import java.net.Socket
+import java.sql.Timestamp
+import java.text.SimpleDateFormat
+import java.util.{Calendar, List => JList, Locale}
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+import org.json4s.{DefaultFormats, NoTypeHints}
+import org.json4s.jackson.Serialization
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.rpc.RpcEndpointRef
+import org.apache.spark.sql._
+import org.apache.spark.sql.execution.streaming.{ContinuousRecordEndpoint, 
ContinuousRecordPartitionOffset, GetRecord}
+import org.apache.spark.sql.sources.v2.DataSourceOptions
+import org.apache.spark.sql.sources.v2.reader.{InputPartition, 
InputPartitionReader, SupportsDeprecatedScanRow}
+import 
org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousInputPartitionReader,
 ContinuousReader, Offset, PartitionOffset}
+import org.apache.spark.sql.types.{StringType, StructField, StructType, 
TimestampType}
+import org.apache.spark.util.RpcUtils
+
+
+object TextSocketContinuousReader {
+  val SCHEMA_REGULAR = StructType(StructField("value", StringType) :: Nil)
+  val SCHEMA_TIMESTAMP = StructType(
+StructField("value", StringType)
+  :: StructField("timestamp", TimestampType) :: Nil)
+  val DATE_FORMAT = new SimpleDateFormat("-MM-dd HH:mm:ss", Locale.US)
+}
+
+/**
+ * A ContinuousReader that reads text lines through a TCP socket, designed 
only for tutorials and
+ * debugging. This ContinuousReader will *not* work in production 
applications due to multiple
+ * reasons, including no support for fault recovery.
+ *
+ * The driver maintains a socket connection to the host-port, keeps the 
received messages in
+ * buckets and serves the messages to the executors via a RPC endpoint.
+ */
+class TextSocketContinuousReader(options: DataSourceOptions) extends 
ContinuousReader
+  with SupportsDeprecatedScanRow with Logging {
+  implicit val defaultFormats: DefaultFormats = DefaultFormats
+
+  private val host: String = options.get("host").get()
+  private val port: Int = options.get("port").get().toInt
+
+  assert(SparkSession.getActiveSession.isDefined)
+  private val spark = SparkSession.getActiveSession.get
+  private val numPartitions = spark.sparkContext.defaultParallelism
+
+  @GuardedBy("this")
+  private var socket: Socket = _
+
+  @GuardedBy("this")
+  private var readThread: Thread = _
+
+  @GuardedBy("this")
+  private val buckets = Seq.fill(numPartitions)(new ListBuffer[(String, 
Timestamp)])
+
+  @GuardedBy("this")
+  private var currentOffset: Int = -1
+
+  private var startOffset: TextSocketOffset = _
+
+  private val recordEndpoint = new ContinuousRecordEndpoint(buckets, this)
+  @volatile private var endpointRef: RpcEndpointRef = _
+
+  initialize()
+
+  override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = {
+assert(offsets.length == numPartitions)
+val offs = offsets
+  .map(_.asInstanceOf[ContinuousRecordPartitionOffset])
+  .sortBy(_.partitionId)
+  .map(_.offset)
+  .toList
+TextSocketOffset(offs)
+  }
+
+  override def deserializeOffset(json: String): Offset = {
+TextSocketOffset(Seria

[GitHub] spark pull request #21199: [SPARK-24127][SS] Continuous text socket source

2018-08-01 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21199#discussion_r207078263
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTextSocketSource.scala
 ---
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.io.{BufferedReader, InputStreamReader, IOException}
+import java.net.Socket
+import java.sql.Timestamp
+import java.text.SimpleDateFormat
+import java.util.{Calendar, List => JList, Locale}
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+import org.json4s.{DefaultFormats, NoTypeHints}
+import org.json4s.jackson.Serialization
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.rpc.RpcEndpointRef
+import org.apache.spark.sql._
+import org.apache.spark.sql.execution.streaming.{ContinuousRecordEndpoint, 
ContinuousRecordPartitionOffset, GetRecord}
+import org.apache.spark.sql.sources.v2.DataSourceOptions
+import org.apache.spark.sql.sources.v2.reader.{InputPartition, 
InputPartitionReader, SupportsDeprecatedScanRow}
+import 
org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousInputPartitionReader,
 ContinuousReader, Offset, PartitionOffset}
+import org.apache.spark.sql.types.{StringType, StructField, StructType, 
TimestampType}
+import org.apache.spark.util.RpcUtils
+
+
+object TextSocketContinuousReader {
+  val SCHEMA_REGULAR = StructType(StructField("value", StringType) :: Nil)
+  val SCHEMA_TIMESTAMP = StructType(
+StructField("value", StringType)
+  :: StructField("timestamp", TimestampType) :: Nil)
+  val DATE_FORMAT = new SimpleDateFormat("-MM-dd HH:mm:ss", Locale.US)
+}
+
+/**
+ * A ContinuousReader that reads text lines through a TCP socket, designed 
only for tutorials and
+ * debugging. This ContinuousReader will *not* work in production 
applications due to multiple
+ * reasons, including no support for fault recovery.
+ *
+ * The driver maintains a socket connection to the host-port, keeps the 
received messages in
+ * buckets and serves the messages to the executors via a RPC endpoint.
+ */
+class TextSocketContinuousReader(options: DataSourceOptions) extends 
ContinuousReader
+  with SupportsDeprecatedScanRow with Logging {
+  implicit val defaultFormats: DefaultFormats = DefaultFormats
+
+  private val host: String = options.get("host").get()
+  private val port: Int = options.get("port").get().toInt
+
+  assert(SparkSession.getActiveSession.isDefined)
+  private val spark = SparkSession.getActiveSession.get
+  private val numPartitions = spark.sparkContext.defaultParallelism
+
+  @GuardedBy("this")
+  private var socket: Socket = _
+
+  @GuardedBy("this")
+  private var readThread: Thread = _
+
+  @GuardedBy("this")
+  private val buckets = Seq.fill(numPartitions)(new ListBuffer[(String, 
Timestamp)])
+
+  @GuardedBy("this")
+  private var currentOffset: Int = -1
+
+  private var startOffset: TextSocketOffset = _
+
+  private val recordEndpoint = new ContinuousRecordEndpoint(buckets, this)
+  @volatile private var endpointRef: RpcEndpointRef = _
+
+  initialize()
+
+  override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = {
+assert(offsets.length == numPartitions)
+val offs = offsets
+  .map(_.asInstanceOf[ContinuousRecordPartitionOffset])
+  .sortBy(_.partitionId)
+  .map(_.offset)
+  .toList
+TextSocketOffset(offs)
+  }
+
+  override def deserializeOffset(json: String): Offset = {
+TextSocketOffset(Seria

[GitHub] spark pull request #21199: [SPARK-24127][SS] Continuous text socket source

2018-08-01 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21199#discussion_r207078232
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTextSocketSource.scala
 ---
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.io.{BufferedReader, InputStreamReader, IOException}
+import java.net.Socket
+import java.sql.Timestamp
+import java.text.SimpleDateFormat
+import java.util.{Calendar, List => JList, Locale}
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+import org.json4s.{DefaultFormats, NoTypeHints}
+import org.json4s.jackson.Serialization
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.rpc.RpcEndpointRef
+import org.apache.spark.sql._
+import org.apache.spark.sql.execution.streaming.{ContinuousRecordEndpoint, 
ContinuousRecordPartitionOffset, GetRecord}
+import org.apache.spark.sql.sources.v2.DataSourceOptions
+import org.apache.spark.sql.sources.v2.reader.{InputPartition, 
InputPartitionReader, SupportsDeprecatedScanRow}
+import 
org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousInputPartitionReader,
 ContinuousReader, Offset, PartitionOffset}
+import org.apache.spark.sql.types.{StringType, StructField, StructType, 
TimestampType}
+import org.apache.spark.util.RpcUtils
+
+
+object TextSocketContinuousReader {
+  val SCHEMA_REGULAR = StructType(StructField("value", StringType) :: Nil)
+  val SCHEMA_TIMESTAMP = StructType(
+StructField("value", StringType)
+  :: StructField("timestamp", TimestampType) :: Nil)
+  val DATE_FORMAT = new SimpleDateFormat("-MM-dd HH:mm:ss", Locale.US)
+}
+
+/**
+ * A ContinuousReader that reads text lines through a TCP socket, designed 
only for tutorials and
+ * debugging. This ContinuousReader will *not* work in production 
applications due to multiple
+ * reasons, including no support for fault recovery.
+ *
+ * The driver maintains a socket connection to the host-port, keeps the 
received messages in
+ * buckets and serves the messages to the executors via a RPC endpoint.
+ */
+class TextSocketContinuousReader(options: DataSourceOptions) extends 
ContinuousReader
+  with SupportsDeprecatedScanRow with Logging {
+  implicit val defaultFormats: DefaultFormats = DefaultFormats
+
+  private val host: String = options.get("host").get()
+  private val port: Int = options.get("port").get().toInt
+
+  assert(SparkSession.getActiveSession.isDefined)
+  private val spark = SparkSession.getActiveSession.get
+  private val numPartitions = spark.sparkContext.defaultParallelism
+
+  @GuardedBy("this")
+  private var socket: Socket = _
+
+  @GuardedBy("this")
+  private var readThread: Thread = _
+
+  @GuardedBy("this")
+  private val buckets = Seq.fill(numPartitions)(new ListBuffer[(String, 
Timestamp)])
+
+  @GuardedBy("this")
+  private var currentOffset: Int = -1
+
+  private var startOffset: TextSocketOffset = _
+
+  private val recordEndpoint = new ContinuousRecordEndpoint(buckets, this)
+  @volatile private var endpointRef: RpcEndpointRef = _
+
+  initialize()
+
+  override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = {
+assert(offsets.length == numPartitions)
+val offs = offsets
+  .map(_.asInstanceOf[ContinuousRecordPartitionOffset])
+  .sortBy(_.partitionId)
+  .map(_.offset)
+  .toList
+TextSocketOffset(offs)
--- End diff --

There is an assertion above `assert(offsets.length == numPartitions)` 
(option 1

[GitHub] spark pull request #21199: [SPARK-24127][SS] Continuous text socket source

2018-08-01 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21199#discussion_r207078197
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTextSocketSource.scala
 ---
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.io.{BufferedReader, InputStreamReader, IOException}
+import java.net.Socket
+import java.sql.Timestamp
+import java.text.SimpleDateFormat
+import java.util.{Calendar, List => JList, Locale}
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+import org.json4s.{DefaultFormats, NoTypeHints}
+import org.json4s.jackson.Serialization
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.rpc.RpcEndpointRef
+import org.apache.spark.sql._
+import org.apache.spark.sql.execution.streaming.{ContinuousRecordEndpoint, 
ContinuousRecordPartitionOffset, GetRecord}
+import org.apache.spark.sql.sources.v2.DataSourceOptions
+import org.apache.spark.sql.sources.v2.reader.{InputPartition, 
InputPartitionReader, SupportsDeprecatedScanRow}
+import 
org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousInputPartitionReader,
 ContinuousReader, Offset, PartitionOffset}
+import org.apache.spark.sql.types.{StringType, StructField, StructType, 
TimestampType}
+import org.apache.spark.util.RpcUtils
+
+
+object TextSocketContinuousReader {
--- End diff --

The companion object can be shared. But overall I guess we need to come up 
better interfaces such that the micro and continuous sources could share more 
code. I would investigate this out of the scope of this PR.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...

2018-08-01 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21721#discussion_r207042893
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala ---
@@ -163,7 +163,8 @@ class SourceProgress protected[sql](
   val endOffset: String,
   val numInputRows: Long,
   val inputRowsPerSecond: Double,
-  val processedRowsPerSecond: Double) extends Serializable {
+  val processedRowsPerSecond: Double,
+  val customMetrics: Option[JValue] = None) extends Serializable {
--- End diff --

Refactored to Json String instead of JValue.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21721: [SPARK-24748][SS] Support for reporting custom metrics v...

2018-08-01 Thread arunmahadevan
Github user arunmahadevan commented on the issue:

https://github.com/apache/spark/pull/21721
  
@HyukjinKwon , have addressed the comments and modified SourceProgress and 
SinkProgress to take String instead of JValue so that this can be easily used 
from Java. Regarding the default value in the ctor, I am not sure if its an 
issue because the object is mostly read only and would be an issue only if the 
user tries to construct it from Java. I have added overloaded ctors anyways. 
Please take a look.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...

2018-07-30 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21721#discussion_r206347147
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/SupportsCustomReaderMetrics.java
 ---
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.sources.v2.reader.streaming;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.sources.v2.CustomMetrics;
+import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
+
+/**
+ * A mix in interface for {@link DataSourceReader}. Data source readers 
can implement this
+ * interface to report custom metrics that gets reported under the
+ * {@link org.apache.spark.sql.streaming.SourceProgress}
+ *
+ */
+@InterfaceStability.Evolving
+public interface SupportsCustomReaderMetrics extends DataSourceReader {
+/**
+ * Returns custom metrics specific to this data source.
+ */
+CustomMetrics getCustomMetrics();
+
+/**
+ * Invoked if the custom metrics returned by {@link 
#getCustomMetrics()} is invalid
--- End diff --

Updated javadoc to explain the same.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21199: [SPARK-24127][SS] Continuous text socket source

2018-07-30 Thread arunmahadevan
Github user arunmahadevan commented on the issue:

https://github.com/apache/spark/pull/21199
  

@HeartSaVioR , rebased with master. 

ping @jose-torres @tdas @zsxwing for review.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21721: [SPARK-24748][SS] Support for reporting custom metrics v...

2018-07-30 Thread arunmahadevan
Github user arunmahadevan commented on the issue:

https://github.com/apache/spark/pull/21721
  
@HeartSaVioR thanks for taking time to review. Addressed the comments, can 
you take a look again?

Regarding the mixin interface, would like to take feedback from others.

@jose-torres @tdas @zsxwing could you take a look at the patch and also 
comment on https://github.com/apache/spark/pull/21721#discussion_r206241038 ? 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...

2018-07-30 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21721#discussion_r206241038
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/SupportsCustomWriterMetrics.java
 ---
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.sources.v2.writer.streaming;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.sources.v2.CustomMetrics;
+import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
+
+/**
+ * A mix in interface for {@link DataSourceWriter}. Data source writers 
can implement this
--- End diff --

The intention was to restrict the mixin so that it can be applied only to 
`DataSourceReader` and `DataSourceWriter` (similar pattern followed in other 
mixins) by inheriting the appropriate types. Unfortunately theres no common 
ancestor for the mixin to inherit from so I had to duplicate the interface. 
Agree that its not ideal. 

A few options:

1. Have a common ancestor marker interface (say `DataSourceComponent`) 
which is the super type of `DataSourceReader` and `DataSourceWriter`. Then we 
can have a single mixin that is a subtype of that interface. We may encounter 
similar usages for other mixins in future.
2. The mixin does not inherit anything (neither DataSourceReader nor 
DataSourceWriter). Here we cannot impose a restriction on the type of classes 
the mixin can be applied to.
3. Duplicate interfaces (the proposed option in the patch).

I prefer option 1, but would like to proceed based on the feedback.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...

2018-07-30 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21721#discussion_r206237610
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
 ---
@@ -143,18 +150,50 @@ trait ProgressReporter extends Logging {
 }
 logDebug(s"Execution stats: $executionStats")
 
+// extracts custom metrics from readers and writers
+def extractMetrics(getMetrics: () => CustomMetrics,
+  onInvalidMetrics: (Exception) => Unit): 
Option[JValue] = {
+  val metrics = getMetrics()
+  if (metrics != null) {
--- End diff --

Replaced it with `Option` and `map`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21819: [SPARK-24863][SS] Report Kafka offset lag as a cu...

2018-07-19 Thread arunmahadevan
GitHub user arunmahadevan opened a pull request:

https://github.com/apache/spark/pull/21819

[SPARK-24863][SS] Report Kafka offset lag as a custom metrics

## What changes were proposed in this pull request?

This builds on top of SPARK-24748 to report 'offset lag' as a custom 
metrics for Kafka structured streaming source.

This lag is the difference between the latest offsets in Kafka the time the 
metrics is reported (just after a micro-batch completes) and the latest offset 
Spark has processed. It can be 0 (or close to 0) if spark keeps up with the 
rate at which messages are ingested into Kafka topics in steady state. This 
measures how far behind the spark source has fallen behind (per partition) and 
can aid in tuning the application.

## How was this patch tested?

Existing and new unit tests

Please review http://spark.apache.org/contributing.html before opening a 
pull request.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/arunmahadevan/spark SPARK-24863

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21819.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21819


commit 29919fe07191cf75f5a7651f8ac9434dc79c119d
Author: Arun Mahadevan 
Date:   2018-07-06T01:51:50Z

[SPARK-24748][SS] Support for reporting custom metrics via Streaming Query 
Progress

commit 43190e9112c3d87e482d81ac8c56097c5c513012
Author: Arun Mahadevan 
Date:   2018-07-06T18:07:28Z

Add error reporting API for custom metrics and address review comments

commit 6d4165efc9c49f73141292b6c0f318f6a3cafb23
Author: Arun Mahadevan 
Date:   2018-07-11T17:42:17Z

Added support for custom metrics in Sink and use MemorySinkV2 as an example

commit bca054f978406b257bfa4c4010e7655144fc820f
Author: Arun Mahadevan 
Date:   2018-07-11T17:59:54Z

remove kafka source metrics outside the scope of this PR

commit 5e732cba85a5c2e3ed3f0487c70c1ebe4c20b75d
Author: Arun Mahadevan 
Date:   2018-07-11T18:48:41Z

Fix scala style issues

Change-Id: I831719f1e9ef1437d9df2b3529bf0a288ef5d0fa

commit c1fc3ca1ec2e2698d1d83ca2bd3ecbecd4da76a6
Author: Arun Mahadevan 
Date:   2018-07-19T20:14:40Z

[SPARK-24863][SS] Report Kafka offset lag as a custom metrics




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21721: [SPARK-24748][SS] Support for reporting custom metrics v...

2018-07-19 Thread arunmahadevan
Github user arunmahadevan commented on the issue:

https://github.com/apache/spark/pull/21721
  
@jose-torres, addressed initial comments.
@tdas, can you also take a look when possible ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21733: [SPARK-24763][SS] Remove redundant key data from value i...

2018-07-11 Thread arunmahadevan
Github user arunmahadevan commented on the issue:

https://github.com/apache/spark/pull/21733
  
@HeartSaVioR , the results looks promising. I am wondering if theres a way 
to make this default option than introducing new configs. Since this is 
internal details anyway theres no need to expose any config if we can identify 
the old vs new format by looking at the fields in the row or by introducing a 
row version to differentiate old vs new.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21721: [SPARK-24748][SS] Support for reporting custom metrics v...

2018-07-11 Thread arunmahadevan
Github user arunmahadevan commented on the issue:

https://github.com/apache/spark/pull/21721
  
@jose-torres I have removed the Kafka lag metrics out of this PR and added 
writer metrics and the number of rows in the memory sink  as an example.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21673: [SPARK-24697][SS] Fix the reported start offsets in stre...

2018-07-11 Thread arunmahadevan
Github user arunmahadevan commented on the issue:

https://github.com/apache/spark/pull/21673
  
@tdas Closing this in favor of #21744 .


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21673: [SPARK-24697][SS] Fix the reported start offsets ...

2018-07-11 Thread arunmahadevan
Github user arunmahadevan closed the pull request at:

https://github.com/apache/spark/pull/21673


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21744: [SPARK-24697][SS] Fix the reported start offsets in stre...

2018-07-11 Thread arunmahadevan
Github user arunmahadevan commented on the issue:

https://github.com/apache/spark/pull/21744
  
@tdas logically this is similar to 
https://github.com/apache/spark/pull/21673. Yes it makes the control flow 
better and LGTM.

Overall the progress reporter is still tightly coupled with the internals 
of StreamExecution and the complexity can be reduced by making the 
ProgressReporter orthogonal to StreamExecution (composition vs inheritance). 
Anyways that can be handled separately.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21721: [SPARK-24748][SS] Support for reporting custom metrics v...

2018-07-06 Thread arunmahadevan
Github user arunmahadevan commented on the issue:

https://github.com/apache/spark/pull/21721
  
@jose-torres @HeartSaVioR , Addressed the initial comments. Will add Writer 
support for custom metrics and add MemorySink as an example. I am ok to move 
out Kafka custom metrics into a separate PR but the lag metrics is valuable IMO.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...

2018-07-06 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21721#discussion_r200730270
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala
 ---
@@ -95,4 +95,25 @@ private object JsonUtils {
 }
 Serialization.write(result)
   }
+
+  /**
+   * Write per-topic partition lag as json string
--- End diff --

This is the difference between the latest offsets in Kafka the time the 
metrics is reported (just after a micro-batch completes) and the latest offset 
Spark has processed. It can be 0 if spark keeps up with the rate at which 
messages are ingested into Kafka topics (steady state).

I would assume we would always want to set some reasonable micro batch 
sizes by setting `maxOffsetsPerTrigger`. Otherwise spark can end up processing 
entire data in the topics in one micro batch (e.g. if the starting offset is 
set to earliest or the streaming job is stopped for sometime and restarted). 
IMO, we should address this by setting some sane defaults which is currently 
missing.

If we want to handle the custom metrics for Kafka outside the scope of this 
PR I will raise a separate one for this, but this can be really useful to 
identify issues like data skews in some partitions or some other issues causing 
spark to not keep up with the ingestion rate.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...

2018-07-06 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21721#discussion_r200730240
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala ---
@@ -178,12 +180,18 @@ class SourceProgress protected[sql](
   if (value.isNaN || value.isInfinity) JNothing else JDouble(value)
 }
 
-("description" -> JString(description)) ~
+val jsonVal = ("description" -> JString(description)) ~
   ("startOffset" -> tryParse(startOffset)) ~
   ("endOffset" -> tryParse(endOffset)) ~
   ("numInputRows" -> JInt(numInputRows)) ~
   ("inputRowsPerSecond" -> safeDoubleToJValue(inputRowsPerSecond)) ~
   ("processedRowsPerSecond" -> 
safeDoubleToJValue(processedRowsPerSecond))
+
+if (customMetrics != null) {
+  jsonVal ~ ("customMetrics" -> tryParse(customMetrics.json()))
--- End diff --

Currently in case of error, it just reports the JSON string as is (similar 
to start/end offsets). However I agree we can add error reporting to this API. 
Will address.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21721: [SPARK-24748][SS] Support for reporting custom metrics v...

2018-07-05 Thread arunmahadevan
Github user arunmahadevan commented on the issue:

https://github.com/apache/spark/pull/21721
  
@tdas @jose-torres  @HeartSaVioR 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...

2018-07-05 Thread arunmahadevan
GitHub user arunmahadevan opened a pull request:

https://github.com/apache/spark/pull/21721

[SPARK-24748][SS] Support for reporting custom metrics via StreamingQuery 
Progress

## What changes were proposed in this pull request?

Currently the Structured Streaming sources and sinks does not have a way to 
report custom metrics. Providing an option to report custom metrics and making 
it available via Streaming Query progress can enable sources and sinks to 
report custom progress information (E.g. the lag metrics for Kafka source).

Similar metrics can be reported for Sinks as well, but would like to get 
initial feedback before proceeding further.

## How was this patch tested?

New and existing unit tests.

Please review http://spark.apache.org/contributing.html before opening a 
pull request.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/arunmahadevan/spark SPARK-24748

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21721.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21721


commit b7b2c3b1c9242fe205869f108548248f71ff8203
Author: Arun Mahadevan 
Date:   2018-07-06T01:51:50Z

[SPARK-24748][SS] Support for reporting custom metrics via Streaming Query 
Progress




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21673: [SPARK-24697][SS] Fix the reported start offsets in stre...

2018-07-05 Thread arunmahadevan
Github user arunmahadevan commented on the issue:

https://github.com/apache/spark/pull/21673
  
@tdas , thanks for your comments. Yes theres problem with the current 
abstraction, and I didn't consider refactoring it since there have been 
multiple changes to this class without changing the underlying structure and 
the fields of the ExecutionStats are accessed from multiple places within 
StreamExecution already.

I did not think adding an extra field would increase the code complexity, 
however if you plan to do major refactoring to simplify the logic and address 
the issues, I am happy to discard this PR and help review your changes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21673: [SPARK-24697][SS] Fix the reported start offsets in stre...

2018-07-05 Thread arunmahadevan
Github user arunmahadevan commented on the issue:

https://github.com/apache/spark/pull/21673
  
@HeartSaVioR , thanks for the inputs. Please check again.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21673: SPARK-24697: Fix the reported start offsets in streaming...

2018-06-29 Thread arunmahadevan
Github user arunmahadevan commented on the issue:

https://github.com/apache/spark/pull/21673
  
@tdas @jose-torres @HeartSaVioR 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21673: SPARK-24697: Fix the reported start offsets in st...

2018-06-29 Thread arunmahadevan
GitHub user arunmahadevan opened a pull request:

https://github.com/apache/spark/pull/21673

SPARK-24697: Fix the reported start offsets in streaming query progress

## What changes were proposed in this pull request?

Streaming query reports progress during each trigger (e.g. after runBatch 
in MicrobatchExcecution). However the reported progress has wrong offsets since 
the offsets are first committed and committedOffsets is updated to the 
availableOffsets before the progress is reported.

This leads to weird progress where startOffset and endOffsets are always 
the same.

```
{
 "id" : "76bf5515-55be-46af-bc79-9fc92cc6d856",
 "runId" : "b526f0f4-24bf-4ddc-b6e8-7b0cc83bdbe8",
...
"sources" : [ {
 "description" : "KafkaV2[Subscribe[topic2]]",
 "startOffset" : {
 "topic2" : {
 "0" : 44
 }
 },
 "endOffset" : {
 "topic2" : {
 "0" : 44
 }
 },
 "numInputRows" : 11,
 "inputRowsPerSecond" : 1.099670098970309,
 "processedRowsPerSecond" : 1.8829168093118795
 } ],
...
}
```

Remember the last committed offset before running the batch and updating 
the committed offsets and report the last committed offsets in the Streaming 
query progress.

## How was this patch tested?

Existing Unit tests and running sample programs.

(If this patch involves UI changes, please attach a screenshot; otherwise, 
remove this)

Please review http://spark.apache.org/contributing.html before opening a 
pull request.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/arunmahadevan/spark SPARK-24697

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21673.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21673


commit 24d75eaadfafdd668cd30d2de3bf463ce775cd69
Author: Arun Mahadevan 
Date:   2018-06-29T21:19:00Z

SPARK-24697: Fix the reported start offsets in streaming query progress




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21622: [SPARK-24637][SS] Add metrics regarding state and waterm...

2018-06-26 Thread arunmahadevan
Github user arunmahadevan commented on the issue:

https://github.com/apache/spark/pull/21622
  
Looks good overall, a couple of minor comments.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21622: [SPARK-24637][SS] Add metrics regarding state and...

2018-06-26 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21622#discussion_r198248243
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetricsReporter.scala
 ---
@@ -39,6 +42,23 @@ class MetricsReporter(
   registerGauge("processingRate-total", _.processedRowsPerSecond, 0.0)
   registerGauge("latency", 
_.durationMs.get("triggerExecution").longValue(), 0L)
 
+  private val timestampFormat = new 
SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601
+  timestampFormat.setTimeZone(DateTimeUtils.getTimeZone("UTC"))
+
+  registerGauge("eventTime-watermark",
+s => convertStringDateToMillis(s.eventTime.get("watermark")), 0L)
--- End diff --

1. nit: rename `s` => `progress` to make it clear.
2. The eventTime-watermark metrics needs to be reported only if the map is 
not empty (event time). Could be skipped if the map is empty (processing time) 
to avoid confusion.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...

2018-06-26 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21560#discussion_r198222671
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousCoalesceRDD.scala
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import org.apache.spark._
+import org.apache.spark.rdd.{CoalescedRDDPartition, RDD}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.execution.streaming.continuous.shuffle._
+import org.apache.spark.util.ThreadUtils
+
+case class ContinuousCoalesceRDDPartition(index: Int) extends Partition {
+  // This flag will be flipped on the executors to indicate that the 
threads processing
+  // partitions of the write-side RDD have been started. These will run 
indefinitely
+  // asynchronously as epochs of the coalesce RDD complete on the read 
side.
+  private[continuous] var writersInitialized: Boolean = false
+}
+
+/**
+ * RDD for continuous coalescing. Asynchronously writes all partitions of 
`prev` into a local
+ * continuous shuffle, and then reads them in the task thread using 
`reader`.
+ */
+class ContinuousCoalesceRDD(
+context: SparkContext,
+numPartitions: Int,
+readerQueueSize: Int,
+epochIntervalMs: Long,
+readerEndpointName: String,
+prev: RDD[InternalRow])
+  extends RDD[InternalRow](context, Nil) {
+
+  override def getPartitions: Array[Partition] = 
Array(ContinuousCoalesceRDDPartition(0))
+
+ val readerRDD = new ContinuousShuffleReadRDD(
+sparkContext,
+numPartitions,
+readerQueueSize,
+prev.getNumPartitions,
+epochIntervalMs,
+Seq(readerEndpointName))
+
+  private lazy val threadPool = ThreadUtils.newDaemonFixedThreadPool(
+prev.getNumPartitions,
+this.name)
+
+  override def compute(split: Partition, context: TaskContext): 
Iterator[InternalRow] = {
+assert(split.index == 0)
+// lazy initialize endpoint so writer can send to it
+
readerRDD.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
+
+if 
(!split.asInstanceOf[ContinuousCoalesceRDDPartition].writersInitialized) {
+  val rpcEnv = SparkEnv.get.rpcEnv
+  val outputPartitioner = new HashPartitioner(1)
+  val endpointRefs = readerRDD.endpointNames.map { endpointName =>
+  rpcEnv.setupEndpointRef(rpcEnv.address, endpointName)
+  }
+
+  val runnables = prev.partitions.map { prevSplit =>
+new Runnable() {
+  override def run(): Unit = {
+TaskContext.setTaskContext(context)
+
+val writer: ContinuousShuffleWriter = new 
RPCContinuousShuffleWriter(
+  prevSplit.index, outputPartitioner, endpointRefs.toArray)
+
+EpochTracker.initializeCurrentEpoch(
+  
context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong)
+while (!context.isInterrupted() && !context.isCompleted()) {
+  writer.write(prev.compute(prevSplit, 
context).asInstanceOf[Iterator[UnsafeRow]])
+  // Note that current epoch is a non-inheritable thread 
local, so each writer thread
+  // can properly increment its own epoch without affecting 
the main task thread.
+  EpochTracker.incrementCurrentEpoch()
+}
+  }
+}
+  }
+
+  context.addTaskCompletionListener { ctx =>
+threadPool.shutdownNow()
+  }
+
+  
split.asInstanceOf[ContinuousCoalesceRDDPartition].writersInitialized = true
+
+  runnables.foreach(threadPool.execute)
+}
+
+readerRDD.compute(readerRDD.partitions(split.index), context)
--- E

[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...

2018-06-25 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21560#discussion_r197985638
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousCoalesceRDD.scala
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import org.apache.spark._
+import org.apache.spark.rdd.{CoalescedRDDPartition, RDD}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.execution.streaming.continuous.shuffle._
+import org.apache.spark.util.ThreadUtils
+
+case class ContinuousCoalesceRDDPartition(index: Int) extends Partition {
+  // This flag will be flipped on the executors to indicate that the 
threads processing
+  // partitions of the write-side RDD have been started. These will run 
indefinitely
+  // asynchronously as epochs of the coalesce RDD complete on the read 
side.
+  private[continuous] var writersInitialized: Boolean = false
+}
+
+/**
+ * RDD for continuous coalescing. Asynchronously writes all partitions of 
`prev` into a local
+ * continuous shuffle, and then reads them in the task thread using 
`reader`.
+ */
+class ContinuousCoalesceRDD(
+context: SparkContext,
+numPartitions: Int,
+readerQueueSize: Int,
+epochIntervalMs: Long,
+readerEndpointName: String,
+prev: RDD[InternalRow])
+  extends RDD[InternalRow](context, Nil) {
+
+  override def getPartitions: Array[Partition] = 
Array(ContinuousCoalesceRDDPartition(0))
+
+ val readerRDD = new ContinuousShuffleReadRDD(
+sparkContext,
+numPartitions,
+readerQueueSize,
+prev.getNumPartitions,
+epochIntervalMs,
+Seq(readerEndpointName))
+
+  private lazy val threadPool = ThreadUtils.newDaemonFixedThreadPool(
+prev.getNumPartitions,
+this.name)
+
+  override def compute(split: Partition, context: TaskContext): 
Iterator[InternalRow] = {
+assert(split.index == 0)
+// lazy initialize endpoint so writer can send to it
+
readerRDD.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
+
+if 
(!split.asInstanceOf[ContinuousCoalesceRDDPartition].writersInitialized) {
+  val rpcEnv = SparkEnv.get.rpcEnv
+  val outputPartitioner = new HashPartitioner(1)
+  val endpointRefs = readerRDD.endpointNames.map { endpointName =>
+  rpcEnv.setupEndpointRef(rpcEnv.address, endpointName)
+  }
+
+  val runnables = prev.partitions.map { prevSplit =>
+new Runnable() {
+  override def run(): Unit = {
+TaskContext.setTaskContext(context)
+
+val writer: ContinuousShuffleWriter = new 
RPCContinuousShuffleWriter(
+  prevSplit.index, outputPartitioner, endpointRefs.toArray)
+
+EpochTracker.initializeCurrentEpoch(
+  
context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong)
+while (!context.isInterrupted() && !context.isCompleted()) {
+  writer.write(prev.compute(prevSplit, 
context).asInstanceOf[Iterator[UnsafeRow]])
+  // Note that current epoch is a non-inheritable thread 
local, so each writer thread
+  // can properly increment its own epoch without affecting 
the main task thread.
+  EpochTracker.incrementCurrentEpoch()
+}
+  }
+}
+  }
+
+  context.addTaskCompletionListener { ctx =>
+threadPool.shutdownNow()
+  }
+
+  
split.asInstanceOf[ContinuousCoalesceRDDPartition].writersInitialized = true
+
+  runnables.foreach(threadPool.execute)
+}
+
+readerRDD.compute(readerRDD.partitions(split.index), context)
---

[GitHub] spark pull request #21617: [SPARK-24634][SS] Add a new metric regarding numb...

2018-06-25 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21617#discussion_r197984227
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala ---
@@ -48,12 +49,13 @@ class StateOperatorProgress private[sql](
   def prettyJson: String = pretty(render(jsonValue))
 
   private[sql] def copy(newNumRowsUpdated: Long): StateOperatorProgress =
-new StateOperatorProgress(numRowsTotal, newNumRowsUpdated, 
memoryUsedBytes)
+new StateOperatorProgress(numRowsTotal, newNumRowsUpdated, 
memoryUsedBytes, numLateInputRows)
 
   private[sql] def jsonValue: JValue = {
 ("numRowsTotal" -> JInt(numRowsTotal)) ~
 ("numRowsUpdated" -> JInt(numRowsUpdated)) ~
-("memoryUsedBytes" -> JInt(memoryUsedBytes))
+("memoryUsedBytes" -> JInt(memoryUsedBytes)) ~
+("numLateInputRows" -> JInt(numLateInputRows))
--- End diff --

What I meant was, if the input to the state operator is the result of the 
aggregate, then we would not be counting the actual input rows to the group by. 
There would be max one row per key, so would give the impression that there are 
not as many late events but in reality it may be more.

 If this is not the case then I am fine.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21617: [SPARK-24634][SS] Add a new metric regarding numb...

2018-06-25 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21617#discussion_r197980605
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala ---
@@ -48,12 +49,13 @@ class StateOperatorProgress private[sql](
   def prettyJson: String = pretty(render(jsonValue))
 
   private[sql] def copy(newNumRowsUpdated: Long): StateOperatorProgress =
-new StateOperatorProgress(numRowsTotal, newNumRowsUpdated, 
memoryUsedBytes)
+new StateOperatorProgress(numRowsTotal, newNumRowsUpdated, 
memoryUsedBytes, numLateInputRows)
 
   private[sql] def jsonValue: JValue = {
 ("numRowsTotal" -> JInt(numRowsTotal)) ~
 ("numRowsUpdated" -> JInt(numRowsUpdated)) ~
-("memoryUsedBytes" -> JInt(memoryUsedBytes))
+("memoryUsedBytes" -> JInt(memoryUsedBytes)) ~
+("numLateInputRows" -> JInt(numLateInputRows))
--- End diff --

Here you are measuring the number of "keys" filtered out of the state store 
since they have crossed the late threshold correct ? It may be better to rename 
this metrics here and at other places to "number of evicted rows". Its better 
if we could rather expose the actual number of events that were late.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...

2018-06-22 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21560#discussion_r197526872
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousCoalesceRDD.scala
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import org.apache.spark._
+import org.apache.spark.rdd.{CoalescedRDDPartition, RDD}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.execution.streaming.continuous.shuffle._
+import org.apache.spark.util.ThreadUtils
+
+case class ContinuousCoalesceRDDPartition(index: Int) extends Partition {
+  // This flag will be flipped on the executors to indicate that the 
threads processing
+  // partitions of the write-side RDD have been started. These will run 
indefinitely
+  // asynchronously as epochs of the coalesce RDD complete on the read 
side.
+  private[continuous] var writersInitialized: Boolean = false
+}
+
+/**
+ * RDD for continuous coalescing. Asynchronously writes all partitions of 
`prev` into a local
+ * continuous shuffle, and then reads them in the task thread using 
`reader`.
+ */
+class ContinuousCoalesceRDD(
+context: SparkContext,
+numPartitions: Int,
+readerQueueSize: Int,
+epochIntervalMs: Long,
+readerEndpointName: String,
+prev: RDD[InternalRow])
+  extends RDD[InternalRow](context, Nil) {
+
+  override def getPartitions: Array[Partition] = 
Array(ContinuousCoalesceRDDPartition(0))
--- End diff --

Agree. And since theres an `assert (numpartitions == 1)` in 
`ContinuousCoalesceExec`, we can probably create any array of `numPartitions` 
here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...

2018-06-22 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21560#discussion_r197564080
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousCoalesceRDD.scala
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import org.apache.spark._
+import org.apache.spark.rdd.{CoalescedRDDPartition, RDD}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.execution.streaming.continuous.shuffle._
+import org.apache.spark.util.ThreadUtils
+
+case class ContinuousCoalesceRDDPartition(index: Int) extends Partition {
+  // This flag will be flipped on the executors to indicate that the 
threads processing
+  // partitions of the write-side RDD have been started. These will run 
indefinitely
+  // asynchronously as epochs of the coalesce RDD complete on the read 
side.
+  private[continuous] var writersInitialized: Boolean = false
+}
+
+/**
+ * RDD for continuous coalescing. Asynchronously writes all partitions of 
`prev` into a local
+ * continuous shuffle, and then reads them in the task thread using 
`reader`.
+ */
+class ContinuousCoalesceRDD(
+context: SparkContext,
+numPartitions: Int,
+readerQueueSize: Int,
+epochIntervalMs: Long,
+readerEndpointName: String,
+prev: RDD[InternalRow])
+  extends RDD[InternalRow](context, Nil) {
+
+  override def getPartitions: Array[Partition] = 
Array(ContinuousCoalesceRDDPartition(0))
+
+ val readerRDD = new ContinuousShuffleReadRDD(
+sparkContext,
+numPartitions,
+readerQueueSize,
+prev.getNumPartitions,
+epochIntervalMs,
+Seq(readerEndpointName))
+
+  private lazy val threadPool = ThreadUtils.newDaemonFixedThreadPool(
+prev.getNumPartitions,
+this.name)
+
+  override def compute(split: Partition, context: TaskContext): 
Iterator[InternalRow] = {
+assert(split.index == 0)
+// lazy initialize endpoint so writer can send to it
+
readerRDD.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
+
+if 
(!split.asInstanceOf[ContinuousCoalesceRDDPartition].writersInitialized) {
+  val rpcEnv = SparkEnv.get.rpcEnv
+  val outputPartitioner = new HashPartitioner(1)
+  val endpointRefs = readerRDD.endpointNames.map { endpointName =>
+  rpcEnv.setupEndpointRef(rpcEnv.address, endpointName)
+  }
+
+  val runnables = prev.partitions.map { prevSplit =>
+new Runnable() {
+  override def run(): Unit = {
+TaskContext.setTaskContext(context)
+
+val writer: ContinuousShuffleWriter = new 
RPCContinuousShuffleWriter(
+  prevSplit.index, outputPartitioner, endpointRefs.toArray)
+
+EpochTracker.initializeCurrentEpoch(
+  
context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong)
+while (!context.isInterrupted() && !context.isCompleted()) {
+  writer.write(prev.compute(prevSplit, 
context).asInstanceOf[Iterator[UnsafeRow]])
+  // Note that current epoch is a non-inheritable thread 
local, so each writer thread
+  // can properly increment its own epoch without affecting 
the main task thread.
+  EpochTracker.incrementCurrentEpoch()
+}
+  }
+}
+  }
+
+  context.addTaskCompletionListener { ctx =>
+threadPool.shutdownNow()
+  }
+
+  
split.asInstanceOf[ContinuousCoalesceRDDPartition].writersInitialized = true
+
+  runnables.foreach(threadPool.execute)
+}
+
+readerRDD.compute(readerRDD.partitions(split.index), context)
--- E

[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...

2018-06-22 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21560#discussion_r197561226
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousCoalesceRDD.scala
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import org.apache.spark._
+import org.apache.spark.rdd.{CoalescedRDDPartition, RDD}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.execution.streaming.continuous.shuffle._
+import org.apache.spark.util.ThreadUtils
+
+case class ContinuousCoalesceRDDPartition(index: Int) extends Partition {
+  // This flag will be flipped on the executors to indicate that the 
threads processing
+  // partitions of the write-side RDD have been started. These will run 
indefinitely
+  // asynchronously as epochs of the coalesce RDD complete on the read 
side.
+  private[continuous] var writersInitialized: Boolean = false
+}
+
+/**
+ * RDD for continuous coalescing. Asynchronously writes all partitions of 
`prev` into a local
+ * continuous shuffle, and then reads them in the task thread using 
`reader`.
+ */
+class ContinuousCoalesceRDD(
+context: SparkContext,
+numPartitions: Int,
+readerQueueSize: Int,
+epochIntervalMs: Long,
+readerEndpointName: String,
+prev: RDD[InternalRow])
+  extends RDD[InternalRow](context, Nil) {
+
+  override def getPartitions: Array[Partition] = 
Array(ContinuousCoalesceRDDPartition(0))
+
+ val readerRDD = new ContinuousShuffleReadRDD(
+sparkContext,
+numPartitions,
+readerQueueSize,
+prev.getNumPartitions,
+epochIntervalMs,
+Seq(readerEndpointName))
+
+  private lazy val threadPool = ThreadUtils.newDaemonFixedThreadPool(
+prev.getNumPartitions,
+this.name)
+
+  override def compute(split: Partition, context: TaskContext): 
Iterator[InternalRow] = {
+assert(split.index == 0)
+// lazy initialize endpoint so writer can send to it
+
readerRDD.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
+
+if 
(!split.asInstanceOf[ContinuousCoalesceRDDPartition].writersInitialized) {
+  val rpcEnv = SparkEnv.get.rpcEnv
+  val outputPartitioner = new HashPartitioner(1)
--- End diff --

Maybe I am missing. Is this more like a re-partition (just shuffles) than 
coalesce?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...

2018-06-22 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21560#discussion_r197523482
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
 ---
@@ -349,6 +349,17 @@ object UnsupportedOperationChecker {
   _: DeserializeToObject | _: SerializeFromObject | _: 
SubqueryAlias |
   _: TypedFilter) =>
 case node if node.nodeName == "StreamingRelationV2" =>
+case Repartition(1, false, _) =>
+case node: Aggregate =>
+  val aboveSinglePartitionCoalesce = node.find {
+case Repartition(1, false, _) => true
+case _ => false
+  }.isDefined
+
+  if (!aboveSinglePartitionCoalesce) {
--- End diff --

Also if theres a single parent partition and theres a `Repartition(1)` that 
node should probably be removed. Not sure if this is already being done.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...

2018-06-22 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21560#discussion_r197520939
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
 ---
@@ -349,6 +349,17 @@ object UnsupportedOperationChecker {
   _: DeserializeToObject | _: SerializeFromObject | _: 
SubqueryAlias |
   _: TypedFilter) =>
 case node if node.nodeName == "StreamingRelationV2" =>
+case Repartition(1, false, _) =>
+case node: Aggregate =>
+  val aboveSinglePartitionCoalesce = node.find {
+case Repartition(1, false, _) => true
+case _ => false
+  }.isDefined
+
+  if (!aboveSinglePartitionCoalesce) {
--- End diff --

What if there was only a single partition to begin with ? Then theres no 
need of Repartition(1) and this check should be skipped.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21504: [SPARK-24479][SS] Added config for registering streaming...

2018-06-12 Thread arunmahadevan
Github user arunmahadevan commented on the issue:

https://github.com/apache/spark/pull/21504
  
@HyukjinKwon , addressed comments. Can you take it forward?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21504: [SPARK-24479][SS] Added config for registering st...

2018-06-12 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21504#discussion_r194817758
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala 
---
@@ -96,6 +96,14 @@ object StaticSQLConf {
 .toSequence
 .createOptional
 
+  val STREAMING_QUERY_LISTENERS = 
buildStaticConf("spark.sql.streamingQueryListeners")
--- End diff --

ok makes sense. renamed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21469: [SPARK-24441][SS] Expose total estimated size of ...

2018-06-11 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21469#discussion_r194592510
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
 ---
@@ -112,14 +122,19 @@ trait StateStoreWriter extends StatefulOperator { 
self: SparkPlan =>
 val storeMetrics = store.metrics
 longMetric("numTotalStateRows") += storeMetrics.numKeys
 longMetric("stateMemory") += storeMetrics.memoryUsedBytes
-storeMetrics.customMetrics.foreach { case (metric, value) =>
-  longMetric(metric.name) += value
+storeMetrics.customMetrics.foreach {
+  case (metric: StateStoreCustomAverageMetric, value) =>
+longMetric(metric.name).set(value * 1.0d)
--- End diff --

Not sure if SQLAppstatusListener comes into play for reporting query 
progress. (e.g. StreamingQueryWrapper.lastProgress)


https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala#L193

Based on my understanding, the SQLMetric is an Accumulator so the merged 
values of the accumulators across all the tasks is returned. The merge 
operation in SQLMetric just adds the value so it makes sense only for count or 
size values. We would be able to display the (min, med, max) values for now in 
the UI and not in the "query status". I was thinking if we make it a count 
metric, it may work (similar to  number of state rows). I am fine with either 
way.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21469: [SPARK-24441][SS] Expose total estimated size of ...

2018-06-11 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21469#discussion_r194483603
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
 ---
@@ -112,14 +122,19 @@ trait StateStoreWriter extends StatefulOperator { 
self: SparkPlan =>
 val storeMetrics = store.metrics
 longMetric("numTotalStateRows") += storeMetrics.numKeys
 longMetric("stateMemory") += storeMetrics.memoryUsedBytes
-storeMetrics.customMetrics.foreach { case (metric, value) =>
-  longMetric(metric.name) += value
+storeMetrics.customMetrics.foreach {
+  case (metric: StateStoreCustomAverageMetric, value) =>
+longMetric(metric.name).set(value * 1.0d)
--- End diff --

How does this get accumulated ? It seems the value last set may get 
propagated to the driver.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21469: [SPARK-24441][SS] Expose total estimated size of ...

2018-06-11 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21469#discussion_r194480087
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
 ---
@@ -247,6 +253,14 @@ private[state] class HDFSBackedStateStoreProvider 
extends StateStoreProvider wit
   private lazy val fm = CheckpointFileManager.create(baseDir, hadoopConf)
   private lazy val sparkConf = 
Option(SparkEnv.get).map(_.conf).getOrElse(new SparkConf)
 
+  private lazy val metricProviderLoaderMapSizeBytes: 
StateStoreCustomSizeMetric =
+StateStoreCustomSizeMetric("providerLoadedMapSizeBytes",
+  "estimated size of states cache in provider")
+
+  private lazy val metricProviderLoaderCountOfVersionsInMap: 
StateStoreCustomAverageMetric =
--- End diff --

Why is "metricProviderLoaderCountOfVersionsInMap" an average metrics? The 
other metrics like "numTotalStateRows" and even "providerLoadedMapSizeBytes" is 
count metric. Shouldn't this be similar?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21504: [SPARK-24479][SS] Added config for registering st...

2018-06-08 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21504#discussion_r194101270
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
 ---
@@ -55,6 +57,19 @@ class StreamingQueryManager private[sql] (sparkSession: 
SparkSession) extends Lo
   @GuardedBy("awaitTerminationLock")
   private var lastTerminatedQuery: StreamingQuery = null
 
+  try {
+sparkSession.sparkContext.conf.get(STREAMING_QUERY_LISTENERS).foreach 
{ classNames =>
+  Utils.loadExtensions(classOf[StreamingQueryListener], classNames,
+sparkSession.sparkContext.conf).foreach(listener => {
+addListener(listener)
+logInfo(s"Registered listener ${listener.getClass.getName}")
+  })
+}
+  } catch {
+case e: Exception =>
+  throw new SparkException(s"Exception when registering 
StreamingQueryListener", e)
--- End diff --

Addressed


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21504: [SPARK-24479][SS] Added config for registering st...

2018-06-08 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21504#discussion_r194100709
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
 ---
@@ -55,6 +57,19 @@ class StreamingQueryManager private[sql] (sparkSession: 
SparkSession) extends Lo
   @GuardedBy("awaitTerminationLock")
   private var lastTerminatedQuery: StreamingQuery = null
 
+  try {
+sparkSession.sparkContext.conf.get(STREAMING_QUERY_LISTENERS).foreach 
{ classNames =>
+  Utils.loadExtensions(classOf[StreamingQueryListener], classNames,
+sparkSession.sparkContext.conf).foreach(listener => {
+addListener(listener)
+logInfo(s"Registered listener ${listener.getClass.getName}")
--- End diff --

Since its only once and provides information to user I guess info is fine. 
Similar pattern here 
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkContext.scala#L2359


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21504: [SPARK-24479][SS] Added config for registering st...

2018-06-07 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21504#discussion_r193923588
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
 ---
@@ -55,6 +56,11 @@ class StreamingQueryManager private[sql] (sparkSession: 
SparkSession) extends Lo
   @GuardedBy("awaitTerminationLock")
   private var lastTerminatedQuery: StreamingQuery = null
 
+  sparkSession.sparkContext.conf.get(STREAMING_QUERY_LISTENERS).foreach { 
classNames =>
+Utils.loadExtensions(classOf[StreamingQueryListener], classNames,
+  sparkSession.sparkContext.conf).foreach(addListener)
+  }
+
--- End diff --

Good point. Addressed, please check.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21504: [SPARK-24479][SS] Added config for registering streaming...

2018-06-07 Thread arunmahadevan
Github user arunmahadevan commented on the issue:

https://github.com/apache/spark/pull/21504
  
@HyukjinKwon , thanks for reviewing. Addressed comments.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21500: Scalable Memory option for HDFSBackedStateStore

2018-06-07 Thread arunmahadevan
Github user arunmahadevan commented on the issue:

https://github.com/apache/spark/pull/21500
  
Clearing the map after each commit might make things worse, since the maps 
needs to be loaded from the snapshot + delta files for the next micro-batch. 
Setting `spark.sql.streaming.minBatchesToRetain` to a lower value might address 
the memory consumption to some extend. 

Maybe we need to explore how to avoid maintaining multiple copies of the 
state in memory within HDFS state store or even explore Rocks DB for 
incremental checkpointing.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21504: SPARK-24480: Added config for registering streami...

2018-06-06 Thread arunmahadevan
GitHub user arunmahadevan opened a pull request:

https://github.com/apache/spark/pull/21504

SPARK-24480: Added config for registering streamingQueryListeners

## What changes were proposed in this pull request?

Currently a "StreamingQueryListener" can only be registered 
programatically. We could have a new config "spark.sql.streamingQueryListeners" 
similar to  "spark.sql.queryExecutionListeners" and "spark.extraListeners" for 
users to register custom streaming listeners.

## How was this patch tested?

New unit test and running example programs.

Please review http://spark.apache.org/contributing.html before opening a 
pull request.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/arunmahadevan/spark SPARK-24480

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21504.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21504


commit d3a3baa8bba65da6a30d1d449f97cbeb467ca14b
Author: Arun Mahadevan 
Date:   2018-06-07T00:57:22Z

SPARK-24480: Added config for registering streamingQueryListeners




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21504: SPARK-24480: Added config for registering streamingQuery...

2018-06-06 Thread arunmahadevan
Github user arunmahadevan commented on the issue:

https://github.com/apache/spark/pull/21504
  
ping @tdas @jose-torres @HeartSaVioR 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21469: [SPARK-24441][SS] Expose total estimated size of states ...

2018-06-06 Thread arunmahadevan
Github user arunmahadevan commented on the issue:

https://github.com/apache/spark/pull/21469
  
Nice, LGTM.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   >