[GitHub] spark issue #23156: [SPARK-24063][SS] Add maximum epoch queue threshold for ...
Github user arunmahadevan commented on the issue: https://github.com/apache/spark/pull/23156 @gaborgsomogyi what I meant was rather than exposing a config to control the internal queue sizes, we could have a higher level config like the max pending epochs. This would act as a back pressure mechanism to stop further processing until the pending epochs are committed. I assume this would also put a limit on the three queues. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23156: [SPARK-24063][SS] Add maximum epoch queue threshold for ...
Github user arunmahadevan commented on the issue: https://github.com/apache/spark/pull/23156 Rather than controlling the queue sizes it would be better to limit the max epoch backlog and fail the query once that threshold is reached. There already seems to be patch that attempted to address this https://github.com/apache/spark/pull/21392 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22598: [SPARK-25501][SS] Add kafka delegation token support.
Github user arunmahadevan commented on the issue: https://github.com/apache/spark/pull/22598 +1, LGTM. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r228681974 --- Diff: core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala --- @@ -66,7 +66,8 @@ private[spark] class HadoopDelegationTokenManager( private def getDelegationTokenProviders: Map[String, HadoopDelegationTokenProvider] = { val providers = Seq(new HadoopFSDelegationTokenProvider(fileSystems)) ++ safeCreateProvider(new HiveDelegationTokenProvider) ++ - safeCreateProvider(new HBaseDelegationTokenProvider) + safeCreateProvider(new HBaseDelegationTokenProvider) ++ + safeCreateProvider(new KafkaDelegationTokenProvider) --- End diff -- yes, I think the best we can do is to document the configs and throw some useful error messages to make the user aware of the "bootstrapservers" config (in case they accidently left it) when the spark-sql-kafka libraries are not in the classpath. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r228671263 --- Diff: core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala --- @@ -66,7 +66,8 @@ private[spark] class HadoopDelegationTokenManager( private def getDelegationTokenProviders: Map[String, HadoopDelegationTokenProvider] = { val providers = Seq(new HadoopFSDelegationTokenProvider(fileSystems)) ++ safeCreateProvider(new HiveDelegationTokenProvider) ++ - safeCreateProvider(new HBaseDelegationTokenProvider) + safeCreateProvider(new HBaseDelegationTokenProvider) ++ + safeCreateProvider(new KafkaDelegationTokenProvider) --- End diff -- Why I thought disabling by default might make sense - The tokens fetch would be attempted if just "spark.kafka.bootstrap.servers" is defined. And if this config is set the spark-sql-kafka libraries needs to be in the class path as well. Better mention these in the docs. We could also consider prefixing all the configs with spark.security.credentials.kafka instead of spark.kafka (like spark.security.credentials.kafka.bootstrap.servers) to make it explicit that these are security related settings required for fetching kafka delegation tokens. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22824: [SPARK-25834] [Structured Streaming]Update Mode should n...
Github user arunmahadevan commented on the issue: https://github.com/apache/spark/pull/22824 I mean its easy to miss if a new "case" is added and "update" mode is not supported. Even now how about LeftSemi, LeftAnti, FullOuter etc? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r228583252 --- Diff: core/src/main/scala/org/apache/spark/deploy/security/KafkaDelegationTokenProvider.scala --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.security + +import scala.reflect.runtime.universe +import scala.util.control.NonFatal + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.security.Credentials +import org.apache.hadoop.security.token.{Token, TokenIdentifier} + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.{KAFKA_BOOTSTRAP_SERVERS, KAFKA_SECURITY_PROTOCOL} +import org.apache.spark.util.Utils + +private[security] class KafkaDelegationTokenProvider + extends HadoopDelegationTokenProvider with Logging { + + override def serviceName: String = "kafka" + + override def obtainDelegationTokens( + hadoopConf: Configuration, + sparkConf: SparkConf, + creds: Credentials): Option[Long] = { +try { + val mirror = universe.runtimeMirror(Utils.getContextOrSparkClassLoader) + val obtainToken = mirror.classLoader. +loadClass("org.apache.spark.sql.kafka010.TokenUtil"). +getMethod("obtainToken", classOf[SparkConf]) --- End diff -- spark-core does not have a dependency on spark-sql-kafka so this is needed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r228339774 --- Diff: core/src/main/scala/org/apache/spark/deploy/security/KafkaDelegationTokenProvider.scala --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.security + +import scala.reflect.runtime.universe +import scala.util.control.NonFatal + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.security.Credentials +import org.apache.hadoop.security.token.{Token, TokenIdentifier} + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.{KAFKA_BOOTSTRAP_SERVERS, KAFKA_SECURITY_PROTOCOL} +import org.apache.spark.util.Utils + +private[security] class KafkaDelegationTokenProvider + extends HadoopDelegationTokenProvider with Logging { + + override def serviceName: String = "kafka" + + override def obtainDelegationTokens( + hadoopConf: Configuration, + sparkConf: SparkConf, + creds: Credentials): Option[Long] = { +try { + val mirror = universe.runtimeMirror(Utils.getContextOrSparkClassLoader) + val obtainToken = mirror.classLoader. +loadClass("org.apache.spark.sql.kafka010.TokenUtil"). +getMethod("obtainToken", classOf[SparkConf]) + + logDebug("Attempting to fetch Kafka security token.") + val token = obtainToken.invoke(null, sparkConf) +.asInstanceOf[Token[_ <: TokenIdentifier]] + creds.addToken(token.getService, token) +} catch { + case NonFatal(e) => +logInfo(s"Failed to get token from service $serviceName", e) +} + +None --- End diff -- Shouldn't this return the time of the next renewal? Otherwise how does the token manager know when should it be renewed or recreated ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r228321793 --- Diff: core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala --- @@ -66,7 +66,8 @@ private[spark] class HadoopDelegationTokenManager( private def getDelegationTokenProviders: Map[String, HadoopDelegationTokenProvider] = { val providers = Seq(new HadoopFSDelegationTokenProvider(fileSystems)) ++ safeCreateProvider(new HiveDelegationTokenProvider) ++ - safeCreateProvider(new HBaseDelegationTokenProvider) + safeCreateProvider(new HBaseDelegationTokenProvider) ++ + safeCreateProvider(new KafkaDelegationTokenProvider) --- End diff -- Update the class docs of which providers are loaded by default or better set the default for `spark.security.credentials.kafka.enabled` to false. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r228320944 --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala --- @@ -647,4 +647,42 @@ package object config { .stringConf .toSequence .createWithDefault(Nil) + + private[spark] val KAFKA_DELEGATION_TOKEN_ENABLED = +ConfigBuilder("spark.kafka.delegation.token.enabled") + .doc("Set to 'true' for obtaining delegation token from kafka.") + .booleanConf + .createWithDefault(false) + + private[spark] val KAFKA_BOOTSTRAP_SERVERS = +ConfigBuilder("spark.kafka.bootstrap.servers") --- End diff -- Given the defaults the tokens fetch would be attempted if only `spark.kafka.bootstrap.servers` is defined right ? And the spark-sql-kafka libraries needs to be in the class path as well ? Better mention these in the docs. And make the `spark.security.credentials.kafka.enabled` default to false if it makes sense. Also consider prefixing all the configs with `spark.security.credentials.kafka` instead of `spark.kafka` (like `spark.security.credentials.kafka.bootstrap.servers`) to make it explicit that these are security related settings required for fetching kafka delegation tokens. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22824: [SPARK-25834] [Structured Streaming]Update Mode should n...
Github user arunmahadevan commented on the issue: https://github.com/apache/spark/pull/22824 It may be enough to do the check just once than repeating similar checks for inner, leftOuter and rightOuter. For example have a single check before the `joinType match {` clause. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22482: WIP - [SPARK-10816][SS] Support session window natively
Github user arunmahadevan commented on the issue: https://github.com/apache/spark/pull/22482 +1 for the idea to provide native session window support. On the approach, it would be ideal if all windowing aggregations can be handled via single plan and state store (v/s the separate plan and state store the patch proposes for session window). Underlying steps are more or less the same for Fixed, Session and Sliding windows. The sort/merge operations have to be part of a window merge function rather than the plan itself. K,Values -> AssignWindows (produces [k, v, timestamp, window]) -> GroupByKey (shuffle) -> MergeWindows (optional step) -> GroupWindows -> aggregate values. Based on how we want to approach it, it could be handled now or as a follow up item (with major refactoring). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22299: [SPARK-24748][SS][FOLLOWUP] Switch custom metrics...
Github user arunmahadevan closed the pull request at: https://github.com/apache/spark/pull/22299 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21721: [SPARK-24748][SS] Support for reporting custom metrics v...
Github user arunmahadevan commented on the issue: https://github.com/apache/spark/pull/21721 >It seems like its life cycle should be bound to an epoch, but unfortunately we don't have such an interface in continuous streaming to represent an epoch. Is it possible that we may end up with 2 sets of custom metrics APIs for micro-batch and continuous? @cloud-fan we could still report progress at the end of each epoch (e.g. [here](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala#L231) and via the EpochCordinator). There need not be separate interfaces for the progress or the custom metrics, just the mechanisms could be different. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21721: [SPARK-24748][SS] Support for reporting custom metrics v...
Github user arunmahadevan commented on the issue: https://github.com/apache/spark/pull/21721 I created a follow up PR to move CustomMetrics (and a few other streaming specific interfaces in that package) to 'streaming' and mark the interfaces as Unstable here - https://github.com/apache/spark/pull/22299 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22299: [SPARK-24748][SS][FOLLOWUP] Switch custom metrics...
GitHub user arunmahadevan opened a pull request: https://github.com/apache/spark/pull/22299 [SPARK-24748][SS][FOLLOWUP] Switch custom metrics to Unstable APIs - Mark custom metrics related APIs as unstable - Move CustomMetrics (and a few other streaming interfaces in parent package) to streaming package Ideally could move `v2/reader/streaming` and `v2/writer/streaming` under `streaming/reader` and `streaming/writer` but that can be a follow up PR if required. ## How was this patch tested? Existing unit tests Please review http://spark.apache.org/contributing.html before opening a pull request. You can merge this pull request into a Git repository by running: $ git pull https://github.com/arunmahadevan/spark refactor Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22299.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22299 commit 49a94c6016a0a4cd6076329797f4c2ac5a9cb588 Author: Arun Mahadevan Date: 2018-08-31T05:53:57Z [SPARK-24748][SS][FOLLOWUP] Switch custom metrics to Unstable APIs - Mark custom metrics related APIs as unstable - Move streaming related interfaces to streaming package --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21721: [SPARK-24748][SS] Support for reporting custom metrics v...
Github user arunmahadevan commented on the issue: https://github.com/apache/spark/pull/21721 @rxin its for streaming sources and sinks as explained in the [doc]( https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/sources/v2/CustomMetrics.java#L23) It had to be shared between classes in reader.streaming and writer.streaming, so was added in the parent package (similar to other streaming specific classes that exists here like [StreamingWriteSupportProvider.java ](https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/sources/v2/StreamingWriteSupportProvider.java) [MicroBatchReadSupportProvider.java](https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/sources/v2/MicroBatchReadSupportProvider.java)) we could move all of it to a streaming package. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21721: [SPARK-24748][SS] Support for reporting custom metrics v...
Github user arunmahadevan commented on the issue: https://github.com/apache/spark/pull/21721 @HyukjinKwon yes we can mark it unstable. Like I mentioned multiple times in previous comments the traits added here like CustomMetrics, SupportsCustomReaderMetrics etc have nothing specific to micro batch or continuous mode and un-affected when we finally start reporting progress for continuous mode. The way to collect and report metrics in continuous mode needs to be figured out and I think should be discussed in respective JIRAs. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21721: [SPARK-24748][SS] Support for reporting custom metrics v...
Github user arunmahadevan commented on the issue: https://github.com/apache/spark/pull/21721 There are many unknowns to be figured out for continuous mode. Though the way to capture the metrics would be different for continuous execution, the interface of whats reported is not expected to change. Given that we already report progress for micro-batch and as a user of spark the changes in the patch are quite useful to report custom metrics for what works right now and since it does not impact other parts of DataSourceV2 apis (only the sources that wants to report custom metrics would add the traits) IMO, we can keep this and continue to investigate in a time-bound manner of how to capture metrics for continuous mode. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21721: [SPARK-24748][SS] Support for reporting custom metrics v...
Github user arunmahadevan commented on the issue: https://github.com/apache/spark/pull/21721 The CustomMetrics are traits which can be mixed in if necessary. (see https://github.com/apache/spark/pull/21721#issuecomment-403878383) and does not affect any other API as such. When query progress is supported for continuous mode, changes can be made if necessary. I can further investigate on how to report query progress for continuous mode. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21721: [SPARK-24748][SS] Support for reporting custom metrics v...
Github user arunmahadevan commented on the issue: https://github.com/apache/spark/pull/21721 @zsxwing @gatorsmile , this PR does not add new APIs as such. It builds on the existing StreamingQueryProgress and adds custom metrics to it. StreamingQueryProgress as such is not reported for continuous mode. When its reported this would be part of that. Are you proposing to not report StreamingQueryProgress at all for micro-batch until things are figured out for continuous mode.? Otherwise I don't see how adding this would break things. cc @jose-torres --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21721: [SPARK-24748][SS] Support for reporting custom metrics v...
Github user arunmahadevan commented on the issue: https://github.com/apache/spark/pull/21721 For continuous queries, the progress could still be reported by posting QueryProgressEvent to the listener for each epoch (instead of micro-batch). The `StreamingQueryProgress` also could mostly be the same. I am not clear on how the continuous query would get the metric updates. We may need some mechanism to post metric updates while the query continues to run. Right now the SQL metrics relies on accumulator and the accumulators might not be updated unless the task completes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22251: [SPARK-25260][SQL] Fix namespace handling in Sche...
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/spark/pull/22251#discussion_r213407599 --- Diff: external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala --- @@ -1099,6 +1098,27 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils { } } + test("check namespace - toAvroType") { +val sparkSchema = StructType(Seq( + StructField("name", StringType, nullable = false), + StructField("address", StructType(Seq( +StructField("city", StringType, nullable = false), +StructField("state", StringType, nullable = false))), +nullable = false))) +val employeeType = SchemaConverters.toAvroType(sparkSchema, + recordName = "employee", + nameSpace = "foo.bar") --- End diff -- Added a test case for toAvroType with empty namespace --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22251: [SPARK-25260][SQL] Fix namespace handling in Sche...
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/spark/pull/22251#discussion_r213407441 --- Diff: external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala --- @@ -1099,6 +1098,27 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils { } } + test("check namespace - toAvroType") { --- End diff -- Its sort of covered in the below existing cases. Do you think we need more? [Validate namespace in avro file that has nested records with the same name](https://github.com/apache/spark/blob/master/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala#L1078) [conversion to avro and back with namespace](https://github.com/apache/spark/blob/master/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala#L510) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22251: [SPARK-25260][SQL] Fix namespace handling in SchemaConve...
Github user arunmahadevan commented on the issue: https://github.com/apache/spark/pull/22251 cc @gengliangwang @dongjoon-hyun --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22251: [SPARK-25260][SQL] Fix namespace handling in Sche...
GitHub user arunmahadevan opened a pull request: https://github.com/apache/spark/pull/22251 [SPARK-25260][SQL] Fix namespace handling in SchemaConverters.toAvroType ## What changes were proposed in this pull request? `toAvroType` converts spark data type to avro schema. It always appends the record name to namespace so its impossible to have an Avro namespace independent of the record name. When invoked with a spark data type like, ```java val sparkSchema = StructType(Seq( StructField("name", StringType, nullable = false), StructField("address", StructType(Seq( StructField("city", StringType, nullable = false), StructField("state", StringType, nullable = false))), nullable = false))) // map it to an avro schema with record name "employee" and top level namespace "foo.bar", val avroSchema = SchemaConverters.toAvroType(sparkSchema, false, "employee", "foo.bar") // result is // avroSchema.getName = employee // avroSchema.getNamespace = foo.bar.employee // avroSchema.getFullname = foo.bar.employee.employee ``` The patch proposes to fix this so that the result is ``` avroSchema.getName = employee avroSchema.getNamespace = foo.bar avroSchema.getFullname = foo.bar.employee ``` ## How was this patch tested? New and existing unit tests. Please review http://spark.apache.org/contributing.html before opening a pull request. You can merge this pull request into a Git repository by running: $ git pull https://github.com/arunmahadevan/spark avro-fix Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22251.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22251 commit f47483951e12d563b7696940a2cfc2fdc3b27ab2 Author: Arun Mahadevan Date: 2018-08-28T08:00:17Z [SPARK-25260][SQL] Fix namespace handling in SchemaConverters.toAvroType --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22238: [SPARK-25245][DOCS][SS] Explain regarding limitin...
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/spark/pull/22238#discussion_r213049895 --- Diff: docs/structured-streaming-programming-guide.md --- @@ -2812,6 +2812,12 @@ See [Input Sources](#input-sources) and [Output Sinks](#output-sinks) sections f # Additional Information +**Gotchas** --- End diff -- IMO, It would be better to keep it here as well as in the code, we may not be able to surface it in the right api docs and chance for users to ignore it. @HeartSaVioR, may be add an example here to illustrate how to use the coalesce? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22121: [SPARK-25133][SQL][Doc]Avro data source guide
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/spark/pull/22121#discussion_r212031015 --- Diff: docs/avro-data-source-guide.md --- @@ -0,0 +1,377 @@ +--- +layout: global +title: Apache Avro Data Source Guide +--- + +* This will become a table of contents (this text will be scraped). +{:toc} + +Since Spark 2.4 release, [Spark SQL](https://spark.apache.org/docs/latest/sql-programming-guide.html) provides built-in support for reading and writing Apache Avro data. + +## Deploying +The `spark-avro` module is external and not included in `spark-submit` or `spark-shell` by default. + +As with any Spark applications, `spark-submit` is used to launch your application. `spark-avro_{{site.SCALA_BINARY_VERSION}}` +and its dependencies can be directly added to `spark-submit` using `--packages`, such as, + +./bin/spark-submit --packages org.apache.spark:spark-avro_{{site.SCALA_BINARY_VERSION}}:{{site.SPARK_VERSION_SHORT}} ... + +For experimenting on `spark-shell`, you can also use `--packages` to add `org.apache.spark:spark-avro_{{site.SCALA_BINARY_VERSION}}` and its dependencies directly, + +./bin/spark-shell --packages org.apache.spark:spark-avro_{{site.SCALA_BINARY_VERSION}}:{{site.SPARK_VERSION_SHORT}} ... + +See [Application Submission Guide](submitting-applications.html) for more details about submitting applications with external dependencies. + +## Load and Save Functions + +Since `spark-avro` module is external, there is no `.avro` API in +`DataFrameReader` or `DataFrameWriter`. + +To load/save data in Avro format, you need to specify the data source option `format` as `avro`(or `org.apache.spark.sql.avro`). + + +{% highlight scala %} + +val usersDF = spark.read.format("avro").load("examples/src/main/resources/users.avro") +usersDF.select("name", "favorite_color").write.format("avro").save("namesAndFavColors.avro") + +{% endhighlight %} + + +{% highlight java %} + +Dataset usersDF = spark.read().format("avro").load("examples/src/main/resources/users.avro"); +usersDF.select("name", "favorite_color").write().format("avro").save("namesAndFavColors.avro"); + +{% endhighlight %} + + +{% highlight python %} + +df = spark.read.format("avro").load("examples/src/main/resources/users.avro") +df.select("name", "favorite_color").write.format("avro").save("namesAndFavColors.avro") + +{% endhighlight %} + + +{% highlight r %} + +df <- read.df("examples/src/main/resources/users.avro", "avro") +write.df(select(df, "name", "favorite_color"), "namesAndFavColors.avro", "avro") + +{% endhighlight %} + + + +## to_avro() and from_avro() +Spark SQL provides function `to_avro` to encode a struct as a string and `from_avro()` to retrieve the struct as a complex type. --- End diff -- does it need to be a struct or any spark sql type? maybe: `to_avro` to encode spark sql types as avro bytes and `from_avro` to retrieve avro bytes as spark sql types? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22143: [SPARK-24647][SS] Report KafkaStreamWriter's writ...
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/spark/pull/22143#discussion_r211339535 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala --- @@ -61,12 +63,30 @@ private[kafka010] class KafkaWriteTask( private[kafka010] abstract class KafkaRowWriter( inputSchema: Seq[Attribute], topic: Option[String]) { + import scala.collection.JavaConverters._ + + protected val minOffsetAccumulator: collection.concurrent.Map[TopicPartition, Long] = +new ConcurrentHashMap[TopicPartition, Long]().asScala --- End diff -- why is this concurrent map? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22143: [SPARK-24647][SS] Report KafkaStreamWriter's writ...
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/spark/pull/22143#discussion_r211336988 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamWriter.scala --- @@ -116,3 +133,66 @@ class KafkaStreamDataWriter( } } } + +private[kafka010] case class KafkaWriterCustomMetrics( +minOffset: KafkaSourceOffset, +maxOffset: KafkaSourceOffset) extends CustomMetrics { + override def json(): String = { +val jsonVal = ("minOffset" -> parse(minOffset.json)) ~ + ("maxOffset" -> parse(maxOffset.json)) +compact(render(jsonVal)) + } + + override def toString: String = json() +} + +private[kafka010] object KafkaWriterCustomMetrics { + + import Math.{min, max} + + def apply(messages: Array[WriterCommitMessage]): KafkaWriterCustomMetrics = { +val minMax = collate(messages) +KafkaWriterCustomMetrics(minMax._1, minMax._2) + } + + private def collate(messages: Array[WriterCommitMessage]): --- End diff -- good to leave some comment on what this does. It seems to be computing the min/max offset per partition? If so choosing an apt name for that function would make it clearer. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22143: [SPARK-24647][SS] Report KafkaStreamWriter's writ...
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/spark/pull/22143#discussion_r211336368 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamWriter.scala --- @@ -19,18 +19,23 @@ package org.apache.spark.sql.kafka010 import scala.collection.JavaConverters._ +import org.json4s.JsonDSL._ +import org.json4s.jackson.JsonMethods._ + import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.kafka010.KafkaWriter.validateQuery +import org.apache.spark.sql.sources.v2.CustomMetrics import org.apache.spark.sql.sources.v2.writer._ -import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter +import org.apache.spark.sql.sources.v2.writer.streaming.{StreamWriter, SupportsCustomWriterMetrics} import org.apache.spark.sql.types.StructType /** * Dummy commit message. The DataSourceV2 framework requires a commit message implementation but we * don't need to really send one. */ -case object KafkaWriterCommitMessage extends WriterCommitMessage +case class KafkaWriterCommitMessage(minOffset: KafkaSourceOffset, maxOffset: KafkaSourceOffset) --- End diff -- Its kind of odd that the writer commit message includes source offset. IMO, better to define a `KafkaSinkOffset` or if it can be common, something like `KafkaOffsets`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21919: [SPARK-24933][SS] Report numOutputRows in SinkProgress
Github user arunmahadevan commented on the issue: https://github.com/apache/spark/pull/21919 LGTM overall except one minor comment. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21919: [SPARK-24933][SS] Report numOutputRows in SinkPro...
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/spark/pull/21919#discussion_r210707152 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala --- @@ -254,3 +259,10 @@ class SinkProgress protected[sql]( } } } + +private[sql] object SinkProgress { + val DEFAULT_NUM_OUTPUT_ROWS: Long = -1L --- End diff -- Does it result in sink progress output with "numOutputRows = -1" ? Maybe add numOutputRows to the output only if the value is not default. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21819: [SPARK-24863][SS] Report Kafka offset lag as a custom me...
Github user arunmahadevan commented on the issue: https://github.com/apache/spark/pull/21819 @HyukjinKwon , can you take it forward? Appreciate your effort and thanks in advance. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21819: [SPARK-24863][SS] Report Kafka offset lag as a cu...
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/spark/pull/21819#discussion_r209699459 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala --- @@ -95,4 +95,20 @@ private object JsonUtils { } Serialization.write(result) } + + /** + * Write per-topic partition lag as json string + */ + def partitionLags(latestOffsets: Map[TopicPartition, Long], +processedOffsets: Map[TopicPartition, Long]): String = { +val result = new HashMap[String, HashMap[Int, Long]]() --- End diff -- Had followed the style in other parts of the class. Addressed and refactored the other places as well. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21819: [SPARK-24863][SS] Report Kafka offset lag as a cu...
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/spark/pull/21819#discussion_r209699010 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala --- @@ -95,4 +95,20 @@ private object JsonUtils { } Serialization.write(result) } + + /** + * Write per-topic partition lag as json string + */ + def partitionLags(latestOffsets: Map[TopicPartition, Long], +processedOffsets: Map[TopicPartition, Long]): String = { --- End diff -- addressed. would it be possible to add this to scala style checks ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21819: [SPARK-24863][SS] Report Kafka offset lag as a custom me...
Github user arunmahadevan commented on the issue: https://github.com/apache/spark/pull/21819 @HeartSaVioR @HyukjinKwon @jose-torres @tdas would you mind taking a look? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21199: [SPARK-24127][SS] Continuous text socket source
Github user arunmahadevan commented on the issue: https://github.com/apache/spark/pull/21199 @HyukjinKwon this has been open for a while, would you mind taking this forward? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/spark/pull/21721#discussion_r208106031 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala --- @@ -196,6 +237,18 @@ trait ProgressReporter extends Logging { currentStatus = currentStatus.copy(isTriggerActive = false) } + /** Extract writer from the executed query plan. */ + private def dataSourceWriter: Option[DataSourceWriter] = { +if (lastExecution == null) return None +lastExecution.executedPlan.collect { + case p if p.isInstanceOf[WriteToDataSourceV2Exec] => --- End diff -- yes, currently the progress is reported only for micro-batch mode. This should be supported for continuous mode as well when we start reporting progress, but needs some more work - https://issues.apache.org/jira/browse/SPARK-23887 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21199: [SPARK-24127][SS] Continuous text socket source
Github user arunmahadevan commented on the issue: https://github.com/apache/spark/pull/21199 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21721: [SPARK-24748][SS] Support for reporting custom metrics v...
Github user arunmahadevan commented on the issue: https://github.com/apache/spark/pull/21721 @HyukjinKwon , the master code changed and I had to rebase and fix issues. Can you take it forward ? There seems to be unrelated test failures in Kafka 0.10 integration suite. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21199: [SPARK-24127][SS] Continuous text socket source
Github user arunmahadevan commented on the issue: https://github.com/apache/spark/pull/21199 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21721: [SPARK-24748][SS] Support for reporting custom metrics v...
Github user arunmahadevan commented on the issue: https://github.com/apache/spark/pull/21721 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21919: [SPARK-24933][SS] Report numOutputRows in SinkProgress v...
Github user arunmahadevan commented on the issue: https://github.com/apache/spark/pull/21919 `numOutputRows` makes sense for all sinks, but I agree the counting should be done at the framework and not by individual sinks. For metrics that does not apply to all sinks, they could report it as some custom metrics if they want to. Heres a proposal to add collect and report custom metrics for sources and sinks - https://github.com/apache/spark/pull/21721 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21721: [SPARK-24748][SS] Support for reporting custom metrics v...
Github user arunmahadevan commented on the issue: https://github.com/apache/spark/pull/21721 The tests keeps failing and looks unrelated. @HyukjinKwon Let me know if you think theres something I should look into. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21199: [SPARK-24127][SS] Continuous text socket source
Github user arunmahadevan commented on the issue: https://github.com/apache/spark/pull/21199 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21199: [SPARK-24127][SS] Continuous text socket source
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/spark/pull/21199#discussion_r207237894 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTextSocketSource.scala --- @@ -0,0 +1,295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.io.{BufferedReader, InputStreamReader, IOException} +import java.net.Socket +import java.sql.Timestamp +import java.text.SimpleDateFormat +import java.util.{Calendar, List => JList, Locale} +import javax.annotation.concurrent.GuardedBy + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ListBuffer + +import org.json4s.{DefaultFormats, NoTypeHints} +import org.json4s.jackson.Serialization + +import org.apache.spark.SparkEnv +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.sql._ +import org.apache.spark.sql.execution.streaming.{ContinuousRecordEndpoint, ContinuousRecordPartitionOffset, GetRecord} +import org.apache.spark.sql.sources.v2.DataSourceOptions +import org.apache.spark.sql.sources.v2.reader.{InputPartition, InputPartitionReader, SupportsDeprecatedScanRow} +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousInputPartitionReader, ContinuousReader, Offset, PartitionOffset} +import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType} +import org.apache.spark.util.RpcUtils + + +object TextSocketContinuousReader { + val SCHEMA_REGULAR = StructType(StructField("value", StringType) :: Nil) + val SCHEMA_TIMESTAMP = StructType( +StructField("value", StringType) + :: StructField("timestamp", TimestampType) :: Nil) + val DATE_FORMAT = new SimpleDateFormat("-MM-dd HH:mm:ss", Locale.US) +} + +/** + * A ContinuousReader that reads text lines through a TCP socket, designed only for tutorials and + * debugging. This ContinuousReader will *not* work in production applications due to multiple + * reasons, including no support for fault recovery. + * + * The driver maintains a socket connection to the host-port, keeps the received messages in + * buckets and serves the messages to the executors via a RPC endpoint. + */ +class TextSocketContinuousReader(options: DataSourceOptions) extends ContinuousReader + with SupportsDeprecatedScanRow with Logging { + implicit val defaultFormats: DefaultFormats = DefaultFormats + + private val host: String = options.get("host").get() + private val port: Int = options.get("port").get().toInt + + assert(SparkSession.getActiveSession.isDefined) + private val spark = SparkSession.getActiveSession.get + private val numPartitions = spark.sparkContext.defaultParallelism + + @GuardedBy("this") + private var socket: Socket = _ + + @GuardedBy("this") + private var readThread: Thread = _ + + @GuardedBy("this") + private val buckets = Seq.fill(numPartitions)(new ListBuffer[(String, Timestamp)]) + + @GuardedBy("this") + private var currentOffset: Int = -1 + + private var startOffset: TextSocketOffset = _ + + private val recordEndpoint = new ContinuousRecordEndpoint(buckets, this) + @volatile private var endpointRef: RpcEndpointRef = _ + + initialize() + + override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = { +assert(offsets.length == numPartitions) +val offs = offsets + .map(_.asInstanceOf[ContinuousRecordPartitionOffset]) + .sortBy(_.partitionId) + .map(_.offset) + .toList +TextSocketOffset(offs) + } + + override def deserializeOffset(json: String): Offset = { +TextSocketOffset(Seria
[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/spark/pull/21721#discussion_r207232187 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala --- @@ -163,7 +163,27 @@ class SourceProgress protected[sql]( val endOffset: String, val numInputRows: Long, val inputRowsPerSecond: Double, - val processedRowsPerSecond: Double) extends Serializable { + val processedRowsPerSecond: Double, + val customMetrics: String) extends Serializable { + + /** SourceProgress without custom metrics. */ + def this( --- End diff -- changed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21199: [SPARK-24127][SS] Continuous text socket source
Github user arunmahadevan commented on the issue: https://github.com/apache/spark/pull/21199 @HeartSaVioR , Addressed your comments. Let me know if I missed something. Also rebased and had to change more code to use the new interfaces. I hope if we can speed up the review cycles in general than leaving PRs to hibernation for a while and then the developer will loose the context and other things would have changed in the meanwhile. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21199: [SPARK-24127][SS] Continuous text socket source
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/spark/pull/21199#discussion_r207078242 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTextSocketSource.scala --- @@ -0,0 +1,295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.io.{BufferedReader, InputStreamReader, IOException} +import java.net.Socket +import java.sql.Timestamp +import java.text.SimpleDateFormat +import java.util.{Calendar, List => JList, Locale} +import javax.annotation.concurrent.GuardedBy + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ListBuffer + +import org.json4s.{DefaultFormats, NoTypeHints} +import org.json4s.jackson.Serialization + +import org.apache.spark.SparkEnv +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.sql._ +import org.apache.spark.sql.execution.streaming.{ContinuousRecordEndpoint, ContinuousRecordPartitionOffset, GetRecord} +import org.apache.spark.sql.sources.v2.DataSourceOptions +import org.apache.spark.sql.sources.v2.reader.{InputPartition, InputPartitionReader, SupportsDeprecatedScanRow} +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousInputPartitionReader, ContinuousReader, Offset, PartitionOffset} +import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType} +import org.apache.spark.util.RpcUtils + + +object TextSocketContinuousReader { + val SCHEMA_REGULAR = StructType(StructField("value", StringType) :: Nil) + val SCHEMA_TIMESTAMP = StructType( +StructField("value", StringType) + :: StructField("timestamp", TimestampType) :: Nil) + val DATE_FORMAT = new SimpleDateFormat("-MM-dd HH:mm:ss", Locale.US) +} + +/** + * A ContinuousReader that reads text lines through a TCP socket, designed only for tutorials and + * debugging. This ContinuousReader will *not* work in production applications due to multiple + * reasons, including no support for fault recovery. + * + * The driver maintains a socket connection to the host-port, keeps the received messages in + * buckets and serves the messages to the executors via a RPC endpoint. + */ +class TextSocketContinuousReader(options: DataSourceOptions) extends ContinuousReader + with SupportsDeprecatedScanRow with Logging { + implicit val defaultFormats: DefaultFormats = DefaultFormats + + private val host: String = options.get("host").get() + private val port: Int = options.get("port").get().toInt + + assert(SparkSession.getActiveSession.isDefined) + private val spark = SparkSession.getActiveSession.get + private val numPartitions = spark.sparkContext.defaultParallelism + + @GuardedBy("this") + private var socket: Socket = _ + + @GuardedBy("this") + private var readThread: Thread = _ + + @GuardedBy("this") + private val buckets = Seq.fill(numPartitions)(new ListBuffer[(String, Timestamp)]) + + @GuardedBy("this") + private var currentOffset: Int = -1 + + private var startOffset: TextSocketOffset = _ + + private val recordEndpoint = new ContinuousRecordEndpoint(buckets, this) + @volatile private var endpointRef: RpcEndpointRef = _ + + initialize() + + override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = { +assert(offsets.length == numPartitions) +val offs = offsets + .map(_.asInstanceOf[ContinuousRecordPartitionOffset]) + .sortBy(_.partitionId) + .map(_.offset) + .toList +TextSocketOffset(offs) + } + + override def deserializeOffset(json: String): Offset = { +TextSocketOffset(Seria
[GitHub] spark pull request #21199: [SPARK-24127][SS] Continuous text socket source
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/spark/pull/21199#discussion_r207078263 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTextSocketSource.scala --- @@ -0,0 +1,295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.io.{BufferedReader, InputStreamReader, IOException} +import java.net.Socket +import java.sql.Timestamp +import java.text.SimpleDateFormat +import java.util.{Calendar, List => JList, Locale} +import javax.annotation.concurrent.GuardedBy + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ListBuffer + +import org.json4s.{DefaultFormats, NoTypeHints} +import org.json4s.jackson.Serialization + +import org.apache.spark.SparkEnv +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.sql._ +import org.apache.spark.sql.execution.streaming.{ContinuousRecordEndpoint, ContinuousRecordPartitionOffset, GetRecord} +import org.apache.spark.sql.sources.v2.DataSourceOptions +import org.apache.spark.sql.sources.v2.reader.{InputPartition, InputPartitionReader, SupportsDeprecatedScanRow} +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousInputPartitionReader, ContinuousReader, Offset, PartitionOffset} +import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType} +import org.apache.spark.util.RpcUtils + + +object TextSocketContinuousReader { + val SCHEMA_REGULAR = StructType(StructField("value", StringType) :: Nil) + val SCHEMA_TIMESTAMP = StructType( +StructField("value", StringType) + :: StructField("timestamp", TimestampType) :: Nil) + val DATE_FORMAT = new SimpleDateFormat("-MM-dd HH:mm:ss", Locale.US) +} + +/** + * A ContinuousReader that reads text lines through a TCP socket, designed only for tutorials and + * debugging. This ContinuousReader will *not* work in production applications due to multiple + * reasons, including no support for fault recovery. + * + * The driver maintains a socket connection to the host-port, keeps the received messages in + * buckets and serves the messages to the executors via a RPC endpoint. + */ +class TextSocketContinuousReader(options: DataSourceOptions) extends ContinuousReader + with SupportsDeprecatedScanRow with Logging { + implicit val defaultFormats: DefaultFormats = DefaultFormats + + private val host: String = options.get("host").get() + private val port: Int = options.get("port").get().toInt + + assert(SparkSession.getActiveSession.isDefined) + private val spark = SparkSession.getActiveSession.get + private val numPartitions = spark.sparkContext.defaultParallelism + + @GuardedBy("this") + private var socket: Socket = _ + + @GuardedBy("this") + private var readThread: Thread = _ + + @GuardedBy("this") + private val buckets = Seq.fill(numPartitions)(new ListBuffer[(String, Timestamp)]) + + @GuardedBy("this") + private var currentOffset: Int = -1 + + private var startOffset: TextSocketOffset = _ + + private val recordEndpoint = new ContinuousRecordEndpoint(buckets, this) + @volatile private var endpointRef: RpcEndpointRef = _ + + initialize() + + override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = { +assert(offsets.length == numPartitions) +val offs = offsets + .map(_.asInstanceOf[ContinuousRecordPartitionOffset]) + .sortBy(_.partitionId) + .map(_.offset) + .toList +TextSocketOffset(offs) + } + + override def deserializeOffset(json: String): Offset = { +TextSocketOffset(Seria
[GitHub] spark pull request #21199: [SPARK-24127][SS] Continuous text socket source
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/spark/pull/21199#discussion_r207078232 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTextSocketSource.scala --- @@ -0,0 +1,295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.io.{BufferedReader, InputStreamReader, IOException} +import java.net.Socket +import java.sql.Timestamp +import java.text.SimpleDateFormat +import java.util.{Calendar, List => JList, Locale} +import javax.annotation.concurrent.GuardedBy + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ListBuffer + +import org.json4s.{DefaultFormats, NoTypeHints} +import org.json4s.jackson.Serialization + +import org.apache.spark.SparkEnv +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.sql._ +import org.apache.spark.sql.execution.streaming.{ContinuousRecordEndpoint, ContinuousRecordPartitionOffset, GetRecord} +import org.apache.spark.sql.sources.v2.DataSourceOptions +import org.apache.spark.sql.sources.v2.reader.{InputPartition, InputPartitionReader, SupportsDeprecatedScanRow} +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousInputPartitionReader, ContinuousReader, Offset, PartitionOffset} +import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType} +import org.apache.spark.util.RpcUtils + + +object TextSocketContinuousReader { + val SCHEMA_REGULAR = StructType(StructField("value", StringType) :: Nil) + val SCHEMA_TIMESTAMP = StructType( +StructField("value", StringType) + :: StructField("timestamp", TimestampType) :: Nil) + val DATE_FORMAT = new SimpleDateFormat("-MM-dd HH:mm:ss", Locale.US) +} + +/** + * A ContinuousReader that reads text lines through a TCP socket, designed only for tutorials and + * debugging. This ContinuousReader will *not* work in production applications due to multiple + * reasons, including no support for fault recovery. + * + * The driver maintains a socket connection to the host-port, keeps the received messages in + * buckets and serves the messages to the executors via a RPC endpoint. + */ +class TextSocketContinuousReader(options: DataSourceOptions) extends ContinuousReader + with SupportsDeprecatedScanRow with Logging { + implicit val defaultFormats: DefaultFormats = DefaultFormats + + private val host: String = options.get("host").get() + private val port: Int = options.get("port").get().toInt + + assert(SparkSession.getActiveSession.isDefined) + private val spark = SparkSession.getActiveSession.get + private val numPartitions = spark.sparkContext.defaultParallelism + + @GuardedBy("this") + private var socket: Socket = _ + + @GuardedBy("this") + private var readThread: Thread = _ + + @GuardedBy("this") + private val buckets = Seq.fill(numPartitions)(new ListBuffer[(String, Timestamp)]) + + @GuardedBy("this") + private var currentOffset: Int = -1 + + private var startOffset: TextSocketOffset = _ + + private val recordEndpoint = new ContinuousRecordEndpoint(buckets, this) + @volatile private var endpointRef: RpcEndpointRef = _ + + initialize() + + override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = { +assert(offsets.length == numPartitions) +val offs = offsets + .map(_.asInstanceOf[ContinuousRecordPartitionOffset]) + .sortBy(_.partitionId) + .map(_.offset) + .toList +TextSocketOffset(offs) --- End diff -- There is an assertion above `assert(offsets.length == numPartitions)` (option 1
[GitHub] spark pull request #21199: [SPARK-24127][SS] Continuous text socket source
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/spark/pull/21199#discussion_r207078197 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTextSocketSource.scala --- @@ -0,0 +1,295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.io.{BufferedReader, InputStreamReader, IOException} +import java.net.Socket +import java.sql.Timestamp +import java.text.SimpleDateFormat +import java.util.{Calendar, List => JList, Locale} +import javax.annotation.concurrent.GuardedBy + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ListBuffer + +import org.json4s.{DefaultFormats, NoTypeHints} +import org.json4s.jackson.Serialization + +import org.apache.spark.SparkEnv +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.sql._ +import org.apache.spark.sql.execution.streaming.{ContinuousRecordEndpoint, ContinuousRecordPartitionOffset, GetRecord} +import org.apache.spark.sql.sources.v2.DataSourceOptions +import org.apache.spark.sql.sources.v2.reader.{InputPartition, InputPartitionReader, SupportsDeprecatedScanRow} +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousInputPartitionReader, ContinuousReader, Offset, PartitionOffset} +import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType} +import org.apache.spark.util.RpcUtils + + +object TextSocketContinuousReader { --- End diff -- The companion object can be shared. But overall I guess we need to come up better interfaces such that the micro and continuous sources could share more code. I would investigate this out of the scope of this PR. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/spark/pull/21721#discussion_r207042893 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala --- @@ -163,7 +163,8 @@ class SourceProgress protected[sql]( val endOffset: String, val numInputRows: Long, val inputRowsPerSecond: Double, - val processedRowsPerSecond: Double) extends Serializable { + val processedRowsPerSecond: Double, + val customMetrics: Option[JValue] = None) extends Serializable { --- End diff -- Refactored to Json String instead of JValue. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21721: [SPARK-24748][SS] Support for reporting custom metrics v...
Github user arunmahadevan commented on the issue: https://github.com/apache/spark/pull/21721 @HyukjinKwon , have addressed the comments and modified SourceProgress and SinkProgress to take String instead of JValue so that this can be easily used from Java. Regarding the default value in the ctor, I am not sure if its an issue because the object is mostly read only and would be an issue only if the user tries to construct it from Java. I have added overloaded ctors anyways. Please take a look. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/spark/pull/21721#discussion_r206347147 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/SupportsCustomReaderMetrics.java --- @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.sources.v2.reader.streaming; + +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.sources.v2.CustomMetrics; +import org.apache.spark.sql.sources.v2.reader.DataSourceReader; + +/** + * A mix in interface for {@link DataSourceReader}. Data source readers can implement this + * interface to report custom metrics that gets reported under the + * {@link org.apache.spark.sql.streaming.SourceProgress} + * + */ +@InterfaceStability.Evolving +public interface SupportsCustomReaderMetrics extends DataSourceReader { +/** + * Returns custom metrics specific to this data source. + */ +CustomMetrics getCustomMetrics(); + +/** + * Invoked if the custom metrics returned by {@link #getCustomMetrics()} is invalid --- End diff -- Updated javadoc to explain the same. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21199: [SPARK-24127][SS] Continuous text socket source
Github user arunmahadevan commented on the issue: https://github.com/apache/spark/pull/21199 @HeartSaVioR , rebased with master. ping @jose-torres @tdas @zsxwing for review. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21721: [SPARK-24748][SS] Support for reporting custom metrics v...
Github user arunmahadevan commented on the issue: https://github.com/apache/spark/pull/21721 @HeartSaVioR thanks for taking time to review. Addressed the comments, can you take a look again? Regarding the mixin interface, would like to take feedback from others. @jose-torres @tdas @zsxwing could you take a look at the patch and also comment on https://github.com/apache/spark/pull/21721#discussion_r206241038 ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/spark/pull/21721#discussion_r206241038 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/SupportsCustomWriterMetrics.java --- @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.sources.v2.writer.streaming; + +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.sources.v2.CustomMetrics; +import org.apache.spark.sql.sources.v2.writer.DataSourceWriter; + +/** + * A mix in interface for {@link DataSourceWriter}. Data source writers can implement this --- End diff -- The intention was to restrict the mixin so that it can be applied only to `DataSourceReader` and `DataSourceWriter` (similar pattern followed in other mixins) by inheriting the appropriate types. Unfortunately theres no common ancestor for the mixin to inherit from so I had to duplicate the interface. Agree that its not ideal. A few options: 1. Have a common ancestor marker interface (say `DataSourceComponent`) which is the super type of `DataSourceReader` and `DataSourceWriter`. Then we can have a single mixin that is a subtype of that interface. We may encounter similar usages for other mixins in future. 2. The mixin does not inherit anything (neither DataSourceReader nor DataSourceWriter). Here we cannot impose a restriction on the type of classes the mixin can be applied to. 3. Duplicate interfaces (the proposed option in the patch). I prefer option 1, but would like to proceed based on the feedback. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/spark/pull/21721#discussion_r206237610 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala --- @@ -143,18 +150,50 @@ trait ProgressReporter extends Logging { } logDebug(s"Execution stats: $executionStats") +// extracts custom metrics from readers and writers +def extractMetrics(getMetrics: () => CustomMetrics, + onInvalidMetrics: (Exception) => Unit): Option[JValue] = { + val metrics = getMetrics() + if (metrics != null) { --- End diff -- Replaced it with `Option` and `map` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21819: [SPARK-24863][SS] Report Kafka offset lag as a cu...
GitHub user arunmahadevan opened a pull request: https://github.com/apache/spark/pull/21819 [SPARK-24863][SS] Report Kafka offset lag as a custom metrics ## What changes were proposed in this pull request? This builds on top of SPARK-24748 to report 'offset lag' as a custom metrics for Kafka structured streaming source. This lag is the difference between the latest offsets in Kafka the time the metrics is reported (just after a micro-batch completes) and the latest offset Spark has processed. It can be 0 (or close to 0) if spark keeps up with the rate at which messages are ingested into Kafka topics in steady state. This measures how far behind the spark source has fallen behind (per partition) and can aid in tuning the application. ## How was this patch tested? Existing and new unit tests Please review http://spark.apache.org/contributing.html before opening a pull request. You can merge this pull request into a Git repository by running: $ git pull https://github.com/arunmahadevan/spark SPARK-24863 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21819.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21819 commit 29919fe07191cf75f5a7651f8ac9434dc79c119d Author: Arun Mahadevan Date: 2018-07-06T01:51:50Z [SPARK-24748][SS] Support for reporting custom metrics via Streaming Query Progress commit 43190e9112c3d87e482d81ac8c56097c5c513012 Author: Arun Mahadevan Date: 2018-07-06T18:07:28Z Add error reporting API for custom metrics and address review comments commit 6d4165efc9c49f73141292b6c0f318f6a3cafb23 Author: Arun Mahadevan Date: 2018-07-11T17:42:17Z Added support for custom metrics in Sink and use MemorySinkV2 as an example commit bca054f978406b257bfa4c4010e7655144fc820f Author: Arun Mahadevan Date: 2018-07-11T17:59:54Z remove kafka source metrics outside the scope of this PR commit 5e732cba85a5c2e3ed3f0487c70c1ebe4c20b75d Author: Arun Mahadevan Date: 2018-07-11T18:48:41Z Fix scala style issues Change-Id: I831719f1e9ef1437d9df2b3529bf0a288ef5d0fa commit c1fc3ca1ec2e2698d1d83ca2bd3ecbecd4da76a6 Author: Arun Mahadevan Date: 2018-07-19T20:14:40Z [SPARK-24863][SS] Report Kafka offset lag as a custom metrics --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21721: [SPARK-24748][SS] Support for reporting custom metrics v...
Github user arunmahadevan commented on the issue: https://github.com/apache/spark/pull/21721 @jose-torres, addressed initial comments. @tdas, can you also take a look when possible ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21733: [SPARK-24763][SS] Remove redundant key data from value i...
Github user arunmahadevan commented on the issue: https://github.com/apache/spark/pull/21733 @HeartSaVioR , the results looks promising. I am wondering if theres a way to make this default option than introducing new configs. Since this is internal details anyway theres no need to expose any config if we can identify the old vs new format by looking at the fields in the row or by introducing a row version to differentiate old vs new. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21721: [SPARK-24748][SS] Support for reporting custom metrics v...
Github user arunmahadevan commented on the issue: https://github.com/apache/spark/pull/21721 @jose-torres I have removed the Kafka lag metrics out of this PR and added writer metrics and the number of rows in the memory sink as an example. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21673: [SPARK-24697][SS] Fix the reported start offsets in stre...
Github user arunmahadevan commented on the issue: https://github.com/apache/spark/pull/21673 @tdas Closing this in favor of #21744 . --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21673: [SPARK-24697][SS] Fix the reported start offsets ...
Github user arunmahadevan closed the pull request at: https://github.com/apache/spark/pull/21673 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21744: [SPARK-24697][SS] Fix the reported start offsets in stre...
Github user arunmahadevan commented on the issue: https://github.com/apache/spark/pull/21744 @tdas logically this is similar to https://github.com/apache/spark/pull/21673. Yes it makes the control flow better and LGTM. Overall the progress reporter is still tightly coupled with the internals of StreamExecution and the complexity can be reduced by making the ProgressReporter orthogonal to StreamExecution (composition vs inheritance). Anyways that can be handled separately. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21721: [SPARK-24748][SS] Support for reporting custom metrics v...
Github user arunmahadevan commented on the issue: https://github.com/apache/spark/pull/21721 @jose-torres @HeartSaVioR , Addressed the initial comments. Will add Writer support for custom metrics and add MemorySink as an example. I am ok to move out Kafka custom metrics into a separate PR but the lag metrics is valuable IMO. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/spark/pull/21721#discussion_r200730270 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala --- @@ -95,4 +95,25 @@ private object JsonUtils { } Serialization.write(result) } + + /** + * Write per-topic partition lag as json string --- End diff -- This is the difference between the latest offsets in Kafka the time the metrics is reported (just after a micro-batch completes) and the latest offset Spark has processed. It can be 0 if spark keeps up with the rate at which messages are ingested into Kafka topics (steady state). I would assume we would always want to set some reasonable micro batch sizes by setting `maxOffsetsPerTrigger`. Otherwise spark can end up processing entire data in the topics in one micro batch (e.g. if the starting offset is set to earliest or the streaming job is stopped for sometime and restarted). IMO, we should address this by setting some sane defaults which is currently missing. If we want to handle the custom metrics for Kafka outside the scope of this PR I will raise a separate one for this, but this can be really useful to identify issues like data skews in some partitions or some other issues causing spark to not keep up with the ingestion rate. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/spark/pull/21721#discussion_r200730240 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala --- @@ -178,12 +180,18 @@ class SourceProgress protected[sql]( if (value.isNaN || value.isInfinity) JNothing else JDouble(value) } -("description" -> JString(description)) ~ +val jsonVal = ("description" -> JString(description)) ~ ("startOffset" -> tryParse(startOffset)) ~ ("endOffset" -> tryParse(endOffset)) ~ ("numInputRows" -> JInt(numInputRows)) ~ ("inputRowsPerSecond" -> safeDoubleToJValue(inputRowsPerSecond)) ~ ("processedRowsPerSecond" -> safeDoubleToJValue(processedRowsPerSecond)) + +if (customMetrics != null) { + jsonVal ~ ("customMetrics" -> tryParse(customMetrics.json())) --- End diff -- Currently in case of error, it just reports the JSON string as is (similar to start/end offsets). However I agree we can add error reporting to this API. Will address. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21721: [SPARK-24748][SS] Support for reporting custom metrics v...
Github user arunmahadevan commented on the issue: https://github.com/apache/spark/pull/21721 @tdas @jose-torres @HeartSaVioR --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...
GitHub user arunmahadevan opened a pull request: https://github.com/apache/spark/pull/21721 [SPARK-24748][SS] Support for reporting custom metrics via StreamingQuery Progress ## What changes were proposed in this pull request? Currently the Structured Streaming sources and sinks does not have a way to report custom metrics. Providing an option to report custom metrics and making it available via Streaming Query progress can enable sources and sinks to report custom progress information (E.g. the lag metrics for Kafka source). Similar metrics can be reported for Sinks as well, but would like to get initial feedback before proceeding further. ## How was this patch tested? New and existing unit tests. Please review http://spark.apache.org/contributing.html before opening a pull request. You can merge this pull request into a Git repository by running: $ git pull https://github.com/arunmahadevan/spark SPARK-24748 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21721.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21721 commit b7b2c3b1c9242fe205869f108548248f71ff8203 Author: Arun Mahadevan Date: 2018-07-06T01:51:50Z [SPARK-24748][SS] Support for reporting custom metrics via Streaming Query Progress --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21673: [SPARK-24697][SS] Fix the reported start offsets in stre...
Github user arunmahadevan commented on the issue: https://github.com/apache/spark/pull/21673 @tdas , thanks for your comments. Yes theres problem with the current abstraction, and I didn't consider refactoring it since there have been multiple changes to this class without changing the underlying structure and the fields of the ExecutionStats are accessed from multiple places within StreamExecution already. I did not think adding an extra field would increase the code complexity, however if you plan to do major refactoring to simplify the logic and address the issues, I am happy to discard this PR and help review your changes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21673: [SPARK-24697][SS] Fix the reported start offsets in stre...
Github user arunmahadevan commented on the issue: https://github.com/apache/spark/pull/21673 @HeartSaVioR , thanks for the inputs. Please check again. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21673: SPARK-24697: Fix the reported start offsets in streaming...
Github user arunmahadevan commented on the issue: https://github.com/apache/spark/pull/21673 @tdas @jose-torres @HeartSaVioR --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21673: SPARK-24697: Fix the reported start offsets in st...
GitHub user arunmahadevan opened a pull request: https://github.com/apache/spark/pull/21673 SPARK-24697: Fix the reported start offsets in streaming query progress ## What changes were proposed in this pull request? Streaming query reports progress during each trigger (e.g. after runBatch in MicrobatchExcecution). However the reported progress has wrong offsets since the offsets are first committed and committedOffsets is updated to the availableOffsets before the progress is reported. This leads to weird progress where startOffset and endOffsets are always the same. ``` { "id" : "76bf5515-55be-46af-bc79-9fc92cc6d856", "runId" : "b526f0f4-24bf-4ddc-b6e8-7b0cc83bdbe8", ... "sources" : [ { "description" : "KafkaV2[Subscribe[topic2]]", "startOffset" : { "topic2" : { "0" : 44 } }, "endOffset" : { "topic2" : { "0" : 44 } }, "numInputRows" : 11, "inputRowsPerSecond" : 1.099670098970309, "processedRowsPerSecond" : 1.8829168093118795 } ], ... } ``` Remember the last committed offset before running the batch and updating the committed offsets and report the last committed offsets in the Streaming query progress. ## How was this patch tested? Existing Unit tests and running sample programs. (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Please review http://spark.apache.org/contributing.html before opening a pull request. You can merge this pull request into a Git repository by running: $ git pull https://github.com/arunmahadevan/spark SPARK-24697 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21673.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21673 commit 24d75eaadfafdd668cd30d2de3bf463ce775cd69 Author: Arun Mahadevan Date: 2018-06-29T21:19:00Z SPARK-24697: Fix the reported start offsets in streaming query progress --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21622: [SPARK-24637][SS] Add metrics regarding state and waterm...
Github user arunmahadevan commented on the issue: https://github.com/apache/spark/pull/21622 Looks good overall, a couple of minor comments. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21622: [SPARK-24637][SS] Add metrics regarding state and...
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/spark/pull/21622#discussion_r198248243 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetricsReporter.scala --- @@ -39,6 +42,23 @@ class MetricsReporter( registerGauge("processingRate-total", _.processedRowsPerSecond, 0.0) registerGauge("latency", _.durationMs.get("triggerExecution").longValue(), 0L) + private val timestampFormat = new SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601 + timestampFormat.setTimeZone(DateTimeUtils.getTimeZone("UTC")) + + registerGauge("eventTime-watermark", +s => convertStringDateToMillis(s.eventTime.get("watermark")), 0L) --- End diff -- 1. nit: rename `s` => `progress` to make it clear. 2. The eventTime-watermark metrics needs to be reported only if the map is not empty (event time). Could be skipped if the map is empty (processing time) to avoid confusion. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/spark/pull/21560#discussion_r198222671 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousCoalesceRDD.scala --- @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import org.apache.spark._ +import org.apache.spark.rdd.{CoalescedRDDPartition, RDD} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.streaming.continuous.shuffle._ +import org.apache.spark.util.ThreadUtils + +case class ContinuousCoalesceRDDPartition(index: Int) extends Partition { + // This flag will be flipped on the executors to indicate that the threads processing + // partitions of the write-side RDD have been started. These will run indefinitely + // asynchronously as epochs of the coalesce RDD complete on the read side. + private[continuous] var writersInitialized: Boolean = false +} + +/** + * RDD for continuous coalescing. Asynchronously writes all partitions of `prev` into a local + * continuous shuffle, and then reads them in the task thread using `reader`. + */ +class ContinuousCoalesceRDD( +context: SparkContext, +numPartitions: Int, +readerQueueSize: Int, +epochIntervalMs: Long, +readerEndpointName: String, +prev: RDD[InternalRow]) + extends RDD[InternalRow](context, Nil) { + + override def getPartitions: Array[Partition] = Array(ContinuousCoalesceRDDPartition(0)) + + val readerRDD = new ContinuousShuffleReadRDD( +sparkContext, +numPartitions, +readerQueueSize, +prev.getNumPartitions, +epochIntervalMs, +Seq(readerEndpointName)) + + private lazy val threadPool = ThreadUtils.newDaemonFixedThreadPool( +prev.getNumPartitions, +this.name) + + override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { +assert(split.index == 0) +// lazy initialize endpoint so writer can send to it + readerRDD.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint + +if (!split.asInstanceOf[ContinuousCoalesceRDDPartition].writersInitialized) { + val rpcEnv = SparkEnv.get.rpcEnv + val outputPartitioner = new HashPartitioner(1) + val endpointRefs = readerRDD.endpointNames.map { endpointName => + rpcEnv.setupEndpointRef(rpcEnv.address, endpointName) + } + + val runnables = prev.partitions.map { prevSplit => +new Runnable() { + override def run(): Unit = { +TaskContext.setTaskContext(context) + +val writer: ContinuousShuffleWriter = new RPCContinuousShuffleWriter( + prevSplit.index, outputPartitioner, endpointRefs.toArray) + +EpochTracker.initializeCurrentEpoch( + context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong) +while (!context.isInterrupted() && !context.isCompleted()) { + writer.write(prev.compute(prevSplit, context).asInstanceOf[Iterator[UnsafeRow]]) + // Note that current epoch is a non-inheritable thread local, so each writer thread + // can properly increment its own epoch without affecting the main task thread. + EpochTracker.incrementCurrentEpoch() +} + } +} + } + + context.addTaskCompletionListener { ctx => +threadPool.shutdownNow() + } + + split.asInstanceOf[ContinuousCoalesceRDDPartition].writersInitialized = true + + runnables.foreach(threadPool.execute) +} + +readerRDD.compute(readerRDD.partitions(split.index), context) --- E
[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/spark/pull/21560#discussion_r197985638 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousCoalesceRDD.scala --- @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import org.apache.spark._ +import org.apache.spark.rdd.{CoalescedRDDPartition, RDD} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.streaming.continuous.shuffle._ +import org.apache.spark.util.ThreadUtils + +case class ContinuousCoalesceRDDPartition(index: Int) extends Partition { + // This flag will be flipped on the executors to indicate that the threads processing + // partitions of the write-side RDD have been started. These will run indefinitely + // asynchronously as epochs of the coalesce RDD complete on the read side. + private[continuous] var writersInitialized: Boolean = false +} + +/** + * RDD for continuous coalescing. Asynchronously writes all partitions of `prev` into a local + * continuous shuffle, and then reads them in the task thread using `reader`. + */ +class ContinuousCoalesceRDD( +context: SparkContext, +numPartitions: Int, +readerQueueSize: Int, +epochIntervalMs: Long, +readerEndpointName: String, +prev: RDD[InternalRow]) + extends RDD[InternalRow](context, Nil) { + + override def getPartitions: Array[Partition] = Array(ContinuousCoalesceRDDPartition(0)) + + val readerRDD = new ContinuousShuffleReadRDD( +sparkContext, +numPartitions, +readerQueueSize, +prev.getNumPartitions, +epochIntervalMs, +Seq(readerEndpointName)) + + private lazy val threadPool = ThreadUtils.newDaemonFixedThreadPool( +prev.getNumPartitions, +this.name) + + override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { +assert(split.index == 0) +// lazy initialize endpoint so writer can send to it + readerRDD.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint + +if (!split.asInstanceOf[ContinuousCoalesceRDDPartition].writersInitialized) { + val rpcEnv = SparkEnv.get.rpcEnv + val outputPartitioner = new HashPartitioner(1) + val endpointRefs = readerRDD.endpointNames.map { endpointName => + rpcEnv.setupEndpointRef(rpcEnv.address, endpointName) + } + + val runnables = prev.partitions.map { prevSplit => +new Runnable() { + override def run(): Unit = { +TaskContext.setTaskContext(context) + +val writer: ContinuousShuffleWriter = new RPCContinuousShuffleWriter( + prevSplit.index, outputPartitioner, endpointRefs.toArray) + +EpochTracker.initializeCurrentEpoch( + context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong) +while (!context.isInterrupted() && !context.isCompleted()) { + writer.write(prev.compute(prevSplit, context).asInstanceOf[Iterator[UnsafeRow]]) + // Note that current epoch is a non-inheritable thread local, so each writer thread + // can properly increment its own epoch without affecting the main task thread. + EpochTracker.incrementCurrentEpoch() +} + } +} + } + + context.addTaskCompletionListener { ctx => +threadPool.shutdownNow() + } + + split.asInstanceOf[ContinuousCoalesceRDDPartition].writersInitialized = true + + runnables.foreach(threadPool.execute) +} + +readerRDD.compute(readerRDD.partitions(split.index), context) ---
[GitHub] spark pull request #21617: [SPARK-24634][SS] Add a new metric regarding numb...
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/spark/pull/21617#discussion_r197984227 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala --- @@ -48,12 +49,13 @@ class StateOperatorProgress private[sql]( def prettyJson: String = pretty(render(jsonValue)) private[sql] def copy(newNumRowsUpdated: Long): StateOperatorProgress = -new StateOperatorProgress(numRowsTotal, newNumRowsUpdated, memoryUsedBytes) +new StateOperatorProgress(numRowsTotal, newNumRowsUpdated, memoryUsedBytes, numLateInputRows) private[sql] def jsonValue: JValue = { ("numRowsTotal" -> JInt(numRowsTotal)) ~ ("numRowsUpdated" -> JInt(numRowsUpdated)) ~ -("memoryUsedBytes" -> JInt(memoryUsedBytes)) +("memoryUsedBytes" -> JInt(memoryUsedBytes)) ~ +("numLateInputRows" -> JInt(numLateInputRows)) --- End diff -- What I meant was, if the input to the state operator is the result of the aggregate, then we would not be counting the actual input rows to the group by. There would be max one row per key, so would give the impression that there are not as many late events but in reality it may be more. If this is not the case then I am fine. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21617: [SPARK-24634][SS] Add a new metric regarding numb...
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/spark/pull/21617#discussion_r197980605 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala --- @@ -48,12 +49,13 @@ class StateOperatorProgress private[sql]( def prettyJson: String = pretty(render(jsonValue)) private[sql] def copy(newNumRowsUpdated: Long): StateOperatorProgress = -new StateOperatorProgress(numRowsTotal, newNumRowsUpdated, memoryUsedBytes) +new StateOperatorProgress(numRowsTotal, newNumRowsUpdated, memoryUsedBytes, numLateInputRows) private[sql] def jsonValue: JValue = { ("numRowsTotal" -> JInt(numRowsTotal)) ~ ("numRowsUpdated" -> JInt(numRowsUpdated)) ~ -("memoryUsedBytes" -> JInt(memoryUsedBytes)) +("memoryUsedBytes" -> JInt(memoryUsedBytes)) ~ +("numLateInputRows" -> JInt(numLateInputRows)) --- End diff -- Here you are measuring the number of "keys" filtered out of the state store since they have crossed the late threshold correct ? It may be better to rename this metrics here and at other places to "number of evicted rows". Its better if we could rather expose the actual number of events that were late. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/spark/pull/21560#discussion_r197526872 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousCoalesceRDD.scala --- @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import org.apache.spark._ +import org.apache.spark.rdd.{CoalescedRDDPartition, RDD} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.streaming.continuous.shuffle._ +import org.apache.spark.util.ThreadUtils + +case class ContinuousCoalesceRDDPartition(index: Int) extends Partition { + // This flag will be flipped on the executors to indicate that the threads processing + // partitions of the write-side RDD have been started. These will run indefinitely + // asynchronously as epochs of the coalesce RDD complete on the read side. + private[continuous] var writersInitialized: Boolean = false +} + +/** + * RDD for continuous coalescing. Asynchronously writes all partitions of `prev` into a local + * continuous shuffle, and then reads them in the task thread using `reader`. + */ +class ContinuousCoalesceRDD( +context: SparkContext, +numPartitions: Int, +readerQueueSize: Int, +epochIntervalMs: Long, +readerEndpointName: String, +prev: RDD[InternalRow]) + extends RDD[InternalRow](context, Nil) { + + override def getPartitions: Array[Partition] = Array(ContinuousCoalesceRDDPartition(0)) --- End diff -- Agree. And since theres an `assert (numpartitions == 1)` in `ContinuousCoalesceExec`, we can probably create any array of `numPartitions` here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/spark/pull/21560#discussion_r197564080 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousCoalesceRDD.scala --- @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import org.apache.spark._ +import org.apache.spark.rdd.{CoalescedRDDPartition, RDD} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.streaming.continuous.shuffle._ +import org.apache.spark.util.ThreadUtils + +case class ContinuousCoalesceRDDPartition(index: Int) extends Partition { + // This flag will be flipped on the executors to indicate that the threads processing + // partitions of the write-side RDD have been started. These will run indefinitely + // asynchronously as epochs of the coalesce RDD complete on the read side. + private[continuous] var writersInitialized: Boolean = false +} + +/** + * RDD for continuous coalescing. Asynchronously writes all partitions of `prev` into a local + * continuous shuffle, and then reads them in the task thread using `reader`. + */ +class ContinuousCoalesceRDD( +context: SparkContext, +numPartitions: Int, +readerQueueSize: Int, +epochIntervalMs: Long, +readerEndpointName: String, +prev: RDD[InternalRow]) + extends RDD[InternalRow](context, Nil) { + + override def getPartitions: Array[Partition] = Array(ContinuousCoalesceRDDPartition(0)) + + val readerRDD = new ContinuousShuffleReadRDD( +sparkContext, +numPartitions, +readerQueueSize, +prev.getNumPartitions, +epochIntervalMs, +Seq(readerEndpointName)) + + private lazy val threadPool = ThreadUtils.newDaemonFixedThreadPool( +prev.getNumPartitions, +this.name) + + override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { +assert(split.index == 0) +// lazy initialize endpoint so writer can send to it + readerRDD.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint + +if (!split.asInstanceOf[ContinuousCoalesceRDDPartition].writersInitialized) { + val rpcEnv = SparkEnv.get.rpcEnv + val outputPartitioner = new HashPartitioner(1) + val endpointRefs = readerRDD.endpointNames.map { endpointName => + rpcEnv.setupEndpointRef(rpcEnv.address, endpointName) + } + + val runnables = prev.partitions.map { prevSplit => +new Runnable() { + override def run(): Unit = { +TaskContext.setTaskContext(context) + +val writer: ContinuousShuffleWriter = new RPCContinuousShuffleWriter( + prevSplit.index, outputPartitioner, endpointRefs.toArray) + +EpochTracker.initializeCurrentEpoch( + context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong) +while (!context.isInterrupted() && !context.isCompleted()) { + writer.write(prev.compute(prevSplit, context).asInstanceOf[Iterator[UnsafeRow]]) + // Note that current epoch is a non-inheritable thread local, so each writer thread + // can properly increment its own epoch without affecting the main task thread. + EpochTracker.incrementCurrentEpoch() +} + } +} + } + + context.addTaskCompletionListener { ctx => +threadPool.shutdownNow() + } + + split.asInstanceOf[ContinuousCoalesceRDDPartition].writersInitialized = true + + runnables.foreach(threadPool.execute) +} + +readerRDD.compute(readerRDD.partitions(split.index), context) --- E
[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/spark/pull/21560#discussion_r197561226 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousCoalesceRDD.scala --- @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import org.apache.spark._ +import org.apache.spark.rdd.{CoalescedRDDPartition, RDD} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.streaming.continuous.shuffle._ +import org.apache.spark.util.ThreadUtils + +case class ContinuousCoalesceRDDPartition(index: Int) extends Partition { + // This flag will be flipped on the executors to indicate that the threads processing + // partitions of the write-side RDD have been started. These will run indefinitely + // asynchronously as epochs of the coalesce RDD complete on the read side. + private[continuous] var writersInitialized: Boolean = false +} + +/** + * RDD for continuous coalescing. Asynchronously writes all partitions of `prev` into a local + * continuous shuffle, and then reads them in the task thread using `reader`. + */ +class ContinuousCoalesceRDD( +context: SparkContext, +numPartitions: Int, +readerQueueSize: Int, +epochIntervalMs: Long, +readerEndpointName: String, +prev: RDD[InternalRow]) + extends RDD[InternalRow](context, Nil) { + + override def getPartitions: Array[Partition] = Array(ContinuousCoalesceRDDPartition(0)) + + val readerRDD = new ContinuousShuffleReadRDD( +sparkContext, +numPartitions, +readerQueueSize, +prev.getNumPartitions, +epochIntervalMs, +Seq(readerEndpointName)) + + private lazy val threadPool = ThreadUtils.newDaemonFixedThreadPool( +prev.getNumPartitions, +this.name) + + override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { +assert(split.index == 0) +// lazy initialize endpoint so writer can send to it + readerRDD.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint + +if (!split.asInstanceOf[ContinuousCoalesceRDDPartition].writersInitialized) { + val rpcEnv = SparkEnv.get.rpcEnv + val outputPartitioner = new HashPartitioner(1) --- End diff -- Maybe I am missing. Is this more like a re-partition (just shuffles) than coalesce? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/spark/pull/21560#discussion_r197523482 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala --- @@ -349,6 +349,17 @@ object UnsupportedOperationChecker { _: DeserializeToObject | _: SerializeFromObject | _: SubqueryAlias | _: TypedFilter) => case node if node.nodeName == "StreamingRelationV2" => +case Repartition(1, false, _) => +case node: Aggregate => + val aboveSinglePartitionCoalesce = node.find { +case Repartition(1, false, _) => true +case _ => false + }.isDefined + + if (!aboveSinglePartitionCoalesce) { --- End diff -- Also if theres a single parent partition and theres a `Repartition(1)` that node should probably be removed. Not sure if this is already being done. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21560: [SPARK-24386][SS] coalesce(1) aggregates in conti...
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/spark/pull/21560#discussion_r197520939 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala --- @@ -349,6 +349,17 @@ object UnsupportedOperationChecker { _: DeserializeToObject | _: SerializeFromObject | _: SubqueryAlias | _: TypedFilter) => case node if node.nodeName == "StreamingRelationV2" => +case Repartition(1, false, _) => +case node: Aggregate => + val aboveSinglePartitionCoalesce = node.find { +case Repartition(1, false, _) => true +case _ => false + }.isDefined + + if (!aboveSinglePartitionCoalesce) { --- End diff -- What if there was only a single partition to begin with ? Then theres no need of Repartition(1) and this check should be skipped. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21504: [SPARK-24479][SS] Added config for registering streaming...
Github user arunmahadevan commented on the issue: https://github.com/apache/spark/pull/21504 @HyukjinKwon , addressed comments. Can you take it forward? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21504: [SPARK-24479][SS] Added config for registering st...
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/spark/pull/21504#discussion_r194817758 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala --- @@ -96,6 +96,14 @@ object StaticSQLConf { .toSequence .createOptional + val STREAMING_QUERY_LISTENERS = buildStaticConf("spark.sql.streamingQueryListeners") --- End diff -- ok makes sense. renamed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21469: [SPARK-24441][SS] Expose total estimated size of ...
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/spark/pull/21469#discussion_r194592510 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala --- @@ -112,14 +122,19 @@ trait StateStoreWriter extends StatefulOperator { self: SparkPlan => val storeMetrics = store.metrics longMetric("numTotalStateRows") += storeMetrics.numKeys longMetric("stateMemory") += storeMetrics.memoryUsedBytes -storeMetrics.customMetrics.foreach { case (metric, value) => - longMetric(metric.name) += value +storeMetrics.customMetrics.foreach { + case (metric: StateStoreCustomAverageMetric, value) => +longMetric(metric.name).set(value * 1.0d) --- End diff -- Not sure if SQLAppstatusListener comes into play for reporting query progress. (e.g. StreamingQueryWrapper.lastProgress) https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala#L193 Based on my understanding, the SQLMetric is an Accumulator so the merged values of the accumulators across all the tasks is returned. The merge operation in SQLMetric just adds the value so it makes sense only for count or size values. We would be able to display the (min, med, max) values for now in the UI and not in the "query status". I was thinking if we make it a count metric, it may work (similar to number of state rows). I am fine with either way. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21469: [SPARK-24441][SS] Expose total estimated size of ...
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/spark/pull/21469#discussion_r194483603 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala --- @@ -112,14 +122,19 @@ trait StateStoreWriter extends StatefulOperator { self: SparkPlan => val storeMetrics = store.metrics longMetric("numTotalStateRows") += storeMetrics.numKeys longMetric("stateMemory") += storeMetrics.memoryUsedBytes -storeMetrics.customMetrics.foreach { case (metric, value) => - longMetric(metric.name) += value +storeMetrics.customMetrics.foreach { + case (metric: StateStoreCustomAverageMetric, value) => +longMetric(metric.name).set(value * 1.0d) --- End diff -- How does this get accumulated ? It seems the value last set may get propagated to the driver. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21469: [SPARK-24441][SS] Expose total estimated size of ...
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/spark/pull/21469#discussion_r194480087 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala --- @@ -247,6 +253,14 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit private lazy val fm = CheckpointFileManager.create(baseDir, hadoopConf) private lazy val sparkConf = Option(SparkEnv.get).map(_.conf).getOrElse(new SparkConf) + private lazy val metricProviderLoaderMapSizeBytes: StateStoreCustomSizeMetric = +StateStoreCustomSizeMetric("providerLoadedMapSizeBytes", + "estimated size of states cache in provider") + + private lazy val metricProviderLoaderCountOfVersionsInMap: StateStoreCustomAverageMetric = --- End diff -- Why is "metricProviderLoaderCountOfVersionsInMap" an average metrics? The other metrics like "numTotalStateRows" and even "providerLoadedMapSizeBytes" is count metric. Shouldn't this be similar? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21504: [SPARK-24479][SS] Added config for registering st...
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/spark/pull/21504#discussion_r194101270 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala --- @@ -55,6 +57,19 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo @GuardedBy("awaitTerminationLock") private var lastTerminatedQuery: StreamingQuery = null + try { +sparkSession.sparkContext.conf.get(STREAMING_QUERY_LISTENERS).foreach { classNames => + Utils.loadExtensions(classOf[StreamingQueryListener], classNames, +sparkSession.sparkContext.conf).foreach(listener => { +addListener(listener) +logInfo(s"Registered listener ${listener.getClass.getName}") + }) +} + } catch { +case e: Exception => + throw new SparkException(s"Exception when registering StreamingQueryListener", e) --- End diff -- Addressed --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21504: [SPARK-24479][SS] Added config for registering st...
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/spark/pull/21504#discussion_r194100709 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala --- @@ -55,6 +57,19 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo @GuardedBy("awaitTerminationLock") private var lastTerminatedQuery: StreamingQuery = null + try { +sparkSession.sparkContext.conf.get(STREAMING_QUERY_LISTENERS).foreach { classNames => + Utils.loadExtensions(classOf[StreamingQueryListener], classNames, +sparkSession.sparkContext.conf).foreach(listener => { +addListener(listener) +logInfo(s"Registered listener ${listener.getClass.getName}") --- End diff -- Since its only once and provides information to user I guess info is fine. Similar pattern here https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkContext.scala#L2359 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21504: [SPARK-24479][SS] Added config for registering st...
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/spark/pull/21504#discussion_r193923588 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala --- @@ -55,6 +56,11 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo @GuardedBy("awaitTerminationLock") private var lastTerminatedQuery: StreamingQuery = null + sparkSession.sparkContext.conf.get(STREAMING_QUERY_LISTENERS).foreach { classNames => +Utils.loadExtensions(classOf[StreamingQueryListener], classNames, + sparkSession.sparkContext.conf).foreach(addListener) + } + --- End diff -- Good point. Addressed, please check. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21504: [SPARK-24479][SS] Added config for registering streaming...
Github user arunmahadevan commented on the issue: https://github.com/apache/spark/pull/21504 @HyukjinKwon , thanks for reviewing. Addressed comments. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21500: Scalable Memory option for HDFSBackedStateStore
Github user arunmahadevan commented on the issue: https://github.com/apache/spark/pull/21500 Clearing the map after each commit might make things worse, since the maps needs to be loaded from the snapshot + delta files for the next micro-batch. Setting `spark.sql.streaming.minBatchesToRetain` to a lower value might address the memory consumption to some extend. Maybe we need to explore how to avoid maintaining multiple copies of the state in memory within HDFS state store or even explore Rocks DB for incremental checkpointing. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21504: SPARK-24480: Added config for registering streami...
GitHub user arunmahadevan opened a pull request: https://github.com/apache/spark/pull/21504 SPARK-24480: Added config for registering streamingQueryListeners ## What changes were proposed in this pull request? Currently a "StreamingQueryListener" can only be registered programatically. We could have a new config "spark.sql.streamingQueryListeners" similar to "spark.sql.queryExecutionListeners" and "spark.extraListeners" for users to register custom streaming listeners. ## How was this patch tested? New unit test and running example programs. Please review http://spark.apache.org/contributing.html before opening a pull request. You can merge this pull request into a Git repository by running: $ git pull https://github.com/arunmahadevan/spark SPARK-24480 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21504.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21504 commit d3a3baa8bba65da6a30d1d449f97cbeb467ca14b Author: Arun Mahadevan Date: 2018-06-07T00:57:22Z SPARK-24480: Added config for registering streamingQueryListeners --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21504: SPARK-24480: Added config for registering streamingQuery...
Github user arunmahadevan commented on the issue: https://github.com/apache/spark/pull/21504 ping @tdas @jose-torres @HeartSaVioR --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21469: [SPARK-24441][SS] Expose total estimated size of states ...
Github user arunmahadevan commented on the issue: https://github.com/apache/spark/pull/21469 Nice, LGTM. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org