[GitHub] spark pull request #18840: [SPARK-21565] Propagate metadata in attribute rep...
GitHub user joseph-torres opened a pull request: https://github.com/apache/spark/pull/18840 [SPARK-21565] Propagate metadata in attribute replacement. ## What changes were proposed in this pull request? Propagate metadata in attribute replacement during streaming execution. This is necessary for EventTimeWatermarks consuming replaced attributes. ## How was this patch tested? new unit test, which was verified to fail before the fix You can merge this pull request into a Git repository by running: $ git pull https://github.com/joseph-torres/spark SPARK-21565 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/18840.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #18840 commit e54d81200569c2260f0995b2f91aa9829dc10ad7 Author: Jose Torres <joseph-tor...@databricks.com> Date: 2017-08-04T03:52:57Z Propagate metadata in attribute replacement. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18925: [SPARK-21713][SC] Replace streaming bit with OutputMode
Github user joseph-torres commented on the issue: https://github.com/apache/spark/pull/18925 @marmbrus --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18925: [SPARK-21713][SC] Replace streaming bit with Outp...
GitHub user joseph-torres opened a pull request: https://github.com/apache/spark/pull/18925 [SPARK-21713][SC] Replace streaming bit with OutputMode ## What changes were proposed in this pull request? * Replace LogicalPlan.isStreaming with output mode. * Replace Deduplicate.streaming with output mode. Note that this is an implementation-only change, so it deliberately does not change isStreaming in the Dataset API. ## How was this patch tested? refactoring only - ran existing unit tests You can merge this pull request into a Git repository by running: $ git pull https://github.com/joseph-torres/spark SC-8027 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/18925.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #18925 commit 4d8fe6e3e0300b847cbf11a8c29b9cda696bb238 Author: Jose Torres <joseph-tor...@databricks.com> Date: 2017-08-10T16:59:40Z Rename FlatMapGroupsWithState.outputMode to funcOutputMode. commit aacf0592e7720e4784377714673cc4d2151be66d Author: Jose Torres <joseph-tor...@databricks.com> Date: 2017-08-10T16:24:56Z partial commit e23b1d0c7563eee61697b7a3a4a1a3f6fe1d Author: Jose Torres <joseph-tor...@databricks.com> Date: 2017-08-10T17:54:54Z Replace LogicalPlan.isStreaming with outputMode Append() and Complete(). commit d2f7e604f58071f7f14c68f9760e1b5d0b705487 Author: Jose Torres <joseph-tor...@databricks.com> Date: 2017-08-11T17:25:52Z Replace Deduplicate.streaming with output mode. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18925: [SPARK-21713][SC] Replace streaming bit with OutputMode
Github user joseph-torres commented on the issue: https://github.com/apache/spark/pull/18925 @tdas - please review --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18925: [SPARK-21713][SC] Replace streaming bit with Outp...
Github user joseph-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/18925#discussion_r132795536 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala --- @@ -779,10 +780,16 @@ case object OneRowRelation extends LeafNode { } /** A logical plan for `dropDuplicates`. */ +case object Deduplicate { + def apply(keys: Seq[Attribute], child: LogicalPlan): Deduplicate = { +Deduplicate(keys, child, child.outputMode) + } +} + case class Deduplicate( keys: Seq[Attribute], child: LogicalPlan, -streaming: Boolean) extends UnaryNode { +originalOutputMode: OutputMode) extends UnaryNode { --- End diff -- The intent here is that callers who need a Deduplicate will use the two-argument form in the Object, which will then use the constructor to preserve the output mode of the child. A val defined inside the case class isn't accounted for by copy(), which caused test failures when I tried it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18790: [SPARK-21587][SS] Added pushdown through watermar...
GitHub user joseph-torres reopened a pull request: https://github.com/apache/spark/pull/18790 [SPARK-21587][SS] Added pushdown through watermarks. ## What changes were proposed in this pull request? * Filter predicates can be pushed through EventTimeWatermark if they're deterministic and do not reference the watermarked attribute. * Projects can be pushed through EventTimeWatermark if they include the watermarked attribute. * Limits can be pushed through EventTimeWatermark. ## How was this patch tested? unit tests You can merge this pull request into a Git repository by running: $ git pull https://github.com/joseph-torres/spark SPARK-21587 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/18790.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #18790 commit 9cc8da5dbc5be9b7b663e002097214e3c0720801 Author: Jose Torres <joseph-tor...@databricks.com> Date: 2017-07-31T22:58:03Z Added pushdown below watermarks. commit 154d34820be73f7d20bf1119fb385940d0ce6455 Author: Jose Torres <joseph-tor...@databricks.com> Date: 2017-08-01T16:47:32Z Push Project, Limit, and Filter through watermark when appropriate. commit 84575b60609a3efc9824eba96541011a99313a63 Author: Jose Torres <joseph-tor...@databricks.com> Date: 2017-08-07T20:50:29Z Remove pushdown limit through watermark. commit 4cae8973f52078afae2a9d92d59c91edaab0ba88 Author: Jose Torres <joseph-tor...@databricks.com> Date: 2017-08-09T01:56:46Z remove leaked import --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18790: [SPARK-21587][SS] Added pushdown through watermar...
Github user joseph-torres closed the pull request at: https://github.com/apache/spark/pull/18790 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18790: [SPARK-21587][SS] Added pushdown through watermarks.
Github user joseph-torres commented on the issue: https://github.com/apache/spark/pull/18790 I'm told I can reopen this instead of making a new PR for the same branch. Reopening with fixed commit history. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18889: [SPARK-21587][SS] Added pushdown through watermar...
Github user joseph-torres closed the pull request at: https://github.com/apache/spark/pull/18889 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18889: [SPARK-21587][SS] Added pushdown through watermar...
GitHub user joseph-torres opened a pull request: https://github.com/apache/spark/pull/18889 [SPARK-21587][SS] Added pushdown through watermarks. ## What changes were proposed in this pull request? Filter predicates can be pushed through EventTimeWatermark if they're deterministic and do not reference the watermarked attribute. Projects can be pushed through EventTimeWatermark if they include the watermarked attribute. This is a copy of PR 18790, which I had to make so I could build the right commit history. ## How was this patch tested? new unit tests Please review http://spark.apache.org/contributing.html before opening a pull request. You can merge this pull request into a Git repository by running: $ git pull https://github.com/joseph-torres/spark SPARK-21587 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/18889.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #18889 commit 9cc8da5dbc5be9b7b663e002097214e3c0720801 Author: Jose Torres <joseph-tor...@databricks.com> Date: 2017-07-31T22:58:03Z Added pushdown below watermarks. commit 154d34820be73f7d20bf1119fb385940d0ce6455 Author: Jose Torres <joseph-tor...@databricks.com> Date: 2017-08-01T16:47:32Z Push Project, Limit, and Filter through watermark when appropriate. commit 84575b60609a3efc9824eba96541011a99313a63 Author: Jose Torres <joseph-tor...@databricks.com> Date: 2017-08-07T20:50:29Z Remove pushdown limit through watermark. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18790: [SPARK-21587][SS] Added pushdown through watermarks.
Github user joseph-torres commented on the issue: https://github.com/apache/spark/pull/18790 Created https://github.com/apache/spark/pull/18889 with everything cherrypicked into the right place. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18790: Added pushdown through watermarks.
GitHub user joseph-torres opened a pull request: https://github.com/apache/spark/pull/18790 Added pushdown through watermarks. ## What changes were proposed in this pull request? Deterministic filter predicates can now be pushed through an EventTimeWatermark. ## How was this patch tested? unit tests You can merge this pull request into a Git repository by running: $ git pull https://github.com/joseph-torres/spark SPARK-21587 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/18790.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #18790 commit 0a837ae6f0a79409c2ab8642a0537253f309754b Author: Jose Torres <joseph-tor...@databricks.com> Date: 2017-07-31T22:58:03Z Added pushdown below watermarks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18790: [SPARK-21587][SS] Added filter pushdown through watermar...
Github user joseph-torres commented on the issue: https://github.com/apache/spark/pull/18790 done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18790: [SPARK-21587][SS] Added pushdown through watermarks.
Github user joseph-torres commented on the issue: https://github.com/apache/spark/pull/18790 Agreed. I've restricted this PR to just filter, since the original story was about enabling partition pruning for filters above the watermark. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18790: [SPARK-21587][SS] Added pushdown through watermar...
Github user joseph-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/18790#discussion_r132238309 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -872,6 +886,25 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper { pushDownPredicate(filter, u.child) { predicate => u.withNewChildren(Seq(Filter(predicate, u.child))) } + +case filter @ Filter(condition, watermark: EventTimeWatermark) => --- End diff -- Done. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18790: [SPARK-21587][SS] Added pushdown through watermar...
Github user joseph-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/18790#discussion_r131761258 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -872,6 +886,25 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper { pushDownPredicate(filter, u.child) { predicate => u.withNewChildren(Seq(Filter(predicate, u.child))) } + +case filter @ Filter(condition, watermark: EventTimeWatermark) => --- End diff -- For filter, the logic has a subtle additional condition. We don't want to push down filters on the watermark attribute, because: * they'll be at least as expensive to evaluate as the watermark operator * partition pruning shouldn't apply since there won't be useful partitions on an event time For project, I don't see a rule for UnaryNode anywhere. I might have missed it. For limit, I actually removed the rule for EventTimeWatermark that I originally added, since it does drop rows in some situations. So I don't think that making EventTimeWatermark subclass UnaryNode would avoid any of the code in this PR. I agree it seems appropriate, but it also seems orthogonal. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18840: [SPARK-21565] Propagate metadata in attribute rep...
Github user joseph-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/18840#discussion_r131707863 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala --- @@ -391,6 +391,30 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche checkDataset[Long](df, 1L to 100L: _*) } + test("SPARK-21565: watermark operator accepts attributes from replacement") { +withTempDir { dir => + dir.delete() + + val df = Seq(("a", 100.0, new java.sql.Timestamp(100L))) +.toDF("symbol", "price", "eventTime") + df.write.json(dir.getCanonicalPath) + + val input = spark.readStream.schema(df.schema) +.json(dir.getCanonicalPath) + + val groupEvents = input +.withWatermark("eventTime", "2 seconds") +.groupBy("symbol", "eventTime") +.agg(count("price") as 'count) +.select("symbol", "eventTime", "count") + val q = groupEvents.writeStream +.outputMode("append") +.format("console") +.start() + q.processAllAvailable() --- End diff -- Done. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18973: [SPARK-21765] Set isStreaming on leaf nodes for s...
Github user joseph-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/18973#discussion_r133844671 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala --- @@ -564,10 +564,14 @@ class SparkSession private( */ private[sql] def internalCreateDataFrame( catalystRows: RDD[InternalRow], - schema: StructType): DataFrame = { + schema: StructType, + isStreaming: Boolean = false): DataFrame = { // TODO: use MutableProjection when rowRDD is another DataFrame and the applied // schema differs from the existing schema on any field data type. -val logicalPlan = LogicalRDD(schema.toAttributes, catalystRows)(self) +val logicalPlan = LogicalRDD( + schema.toAttributes, + catalystRows, + isStreaming = isStreaming)(self) --- End diff -- It's necessary here because there are two other default arguments in the constructor. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18973: [SPARK-21765] Set isStreaming on leaf nodes for s...
Github user joseph-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/18973#discussion_r133844681 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/LogicalPlanSuite.scala --- @@ -75,7 +75,7 @@ class LogicalPlanSuite extends SparkFunSuite { val relation = LocalRelation(AttributeReference("a", IntegerType, nullable = true)()) val incrementalRelation = new LocalRelation( Seq(AttributeReference("a", IntegerType, nullable = true)())) { - override def isStreaming(): Boolean = true + override val isStreaming: Boolean = true --- End diff -- Done. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18973: [SPARK-21765] Set isStreaming on leaf nodes for s...
Github user joseph-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/18973#discussion_r133844686 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala --- @@ -43,7 +43,9 @@ object LocalRelation { } } -case class LocalRelation(output: Seq[Attribute], data: Seq[InternalRow] = Nil) +case class LocalRelation(output: Seq[Attribute], --- End diff -- Done. (I think this is a correct summary?) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18925: [SPARK-21713][SC] Replace streaming bit with Outp...
Github user joseph-torres closed the pull request at: https://github.com/apache/spark/pull/18925 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18925: [SPARK-21713][SC] Replace streaming bit with OutputMode
Github user joseph-torres commented on the issue: https://github.com/apache/spark/pull/18925 The underlying JIRA ticket is won'tfixed because this model doesn't seem better. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19239: [SPARK-22017] Take minimum of all watermark execs...
GitHub user joseph-torres reopened a pull request: https://github.com/apache/spark/pull/19239 [SPARK-22017] Take minimum of all watermark execs in StreamExecution. ## What changes were proposed in this pull request? Take the minimum of all watermark exec nodes as the "real" watermark in StreamExecution, rather than picking one arbitrarily. ## How was this patch tested? new unit test You can merge this pull request into a Git repository by running: $ git pull https://github.com/joseph-torres/spark SPARK-22017 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19239.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19239 commit 4a7f53fdab1e5e640e156a4a3d2ba27837195195 Author: Jose Torres <j...@databricks.com> Date: 2017-09-13T21:49:23Z Implement multiple watermark StreamExecution support. commit 9b9cd19106fae9a2de268eb2b559ca1bf159e9c2 Author: Jose Torres <j...@databricks.com> Date: 2017-09-14T18:30:40Z partially fix test commit 6a4c80b696f42a445c7f846fada3f823e04bd3ab Author: Jose Torres <j...@databricks.com> Date: 2017-09-14T21:52:16Z Finish rewriting test commit 484940e5eb4d1eac1c5ec81f475681c9241bbab2 Author: Jose Torres <j...@databricks.com> Date: 2017-09-14T22:24:36Z make IncrementalExecution.offsetSeqMetadata non-private commit 032f55503c8d424390da1ff85054e3a01e7489eb Author: Jose Torres <j...@databricks.com> Date: 2017-09-14T23:22:22Z properly name test dataframes commit d7f5f60c6be5bf228c960c3549eb81ed869f0227 Author: Jose Torres <j...@databricks.com> Date: 2017-09-14T23:39:22Z Combine test helper functions. commit 2f07f90423d87985322975f8ad5aef8f70f28066 Author: Jose Torres <j...@databricks.com> Date: 2017-09-15T01:18:12Z Key watermarks by relative position rather than attribute. commit 8b605384d77fdeb63b28feabee74284a5ab1409a Author: Jose Torres <j...@databricks.com> Date: 2017-09-15T02:05:14Z Address test comments. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19239: [SPARK-22017] Take minimum of all watermark execs...
Github user joseph-torres closed the pull request at: https://github.com/apache/spark/pull/19239 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.
GitHub user joseph-torres opened a pull request: https://github.com/apache/spark/pull/19327 [WIP] Implement stream-stream outer joins. ## What changes were proposed in this pull request? Allow one-sided outer joins between two streams when a watermark is defined. ## How was this patch tested? new unit tests You can merge this pull request into a Git repository by running: $ git pull https://github.com/joseph-torres/spark outerjoin Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19327.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19327 commit 1dfb95d5b7dee86b45aa831278d3fa7a7dc1917f Author: Jose Torres <j...@databricks.com> Date: 2017-09-22T20:36:50Z Implement stream-stream outer joins. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19212: [SPARK-21988] Add default stats to StreamingExecutionRel...
Github user joseph-torres commented on the issue: https://github.com/apache/spark/pull/19212 @zsxwing for review --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19239: [SPARK-22017] Take minimum of all watermark execs in Str...
Github user joseph-torres commented on the issue: https://github.com/apache/spark/pull/19239 addressed comments --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19239: [SPARK-22017] Take minimum of all watermark execs...
Github user joseph-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/19239#discussion_r139028613 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala --- @@ -300,6 +300,67 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche ) } + test("watermark with 2 streams") { +val first = MemoryStream[Int] + +val firstAggregation = first.toDF() + .withColumn("eventTime", $"value".cast("timestamp")) + .withWatermark("eventTime", "10 seconds") + .select('value) + +val second = MemoryStream[Int] + +val secondAggregation = second.toDF() + .withColumn("eventTime", $"value".cast("timestamp")) + .withWatermark("eventTime", "10 seconds") + .select('value) + +val union = firstAggregation.union(secondAggregation) + .writeStream + .format("memory") + .queryName("test") + .start() + +def populateNewWatermarkFromData(stream: MemoryStream[Int], data: Int*): Unit = { + stream.addData(data) + union.processAllAvailable() + // add a dummy batch so lastExecution has the new watermark + stream.addData(0) + union.processAllAvailable() +} + +def assertQueryWatermark(watermark: Int): Unit = { + assert(union.asInstanceOf[StreamingQueryWrapper].streamingQuery +.lastExecution.offsetSeqMetadata.batchWatermarkMs +== watermark) +} + +populateNewWatermarkFromData(first, 11) --- End diff -- The problem is that watermark recalculation happens at the beginning of each batch, and to sequence executions I have to call CheckData or CheckLastBatch. So that method ends up producing a test multiple times longer, since a single entry is: AddData(realData) CheckLastBatch AddData(0) CheckLastBatch AssertOnQuery --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19239: [SPARK-22017] Take minimum of all watermark execs...
Github user joseph-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/19239#discussion_r139029401 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala --- @@ -560,13 +567,24 @@ class StreamExecution( } if (hasNewData) { var batchWatermarkMs = offsetSeqMetadata.batchWatermarkMs - // Update the eventTime watermark if we find one in the plan. + // Update the eventTime watermarks if we find any in the plan. --- End diff -- Well, we're updating multiple watermarks in the map. We later update `offsetSeqMetadata` with the new minimum one, but that's not in this block. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19239: [SPARK-22017] Take minimum of all watermark execs...
Github user joseph-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/19239#discussion_r139029187 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala --- @@ -130,6 +130,13 @@ class StreamExecution( protected var offsetSeqMetadata = OffsetSeqMetadata( batchWatermarkMs = 0, batchTimestampMs = 0, sparkSession.conf) + /** + * A map from watermarked attributes to their current watermark. The minimum watermark + * timestamp present here will be used as the overall query watermark in offsetSeqMetadata; + * the query watermark is what's logged and used to age out old state. + */ + protected var attributeWatermarkMsMap: AttributeMap[Long] = AttributeMap(Seq()) --- End diff -- This map has to persist and get updated across batches, and I'm not sure how to do that with a local variable or a val. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19239: [SPARK-22017] Take minimum of all watermark execs...
GitHub user joseph-torres opened a pull request: https://github.com/apache/spark/pull/19239 [SPARK-22017] Take minimum of all watermark execs in StreamExecution. ## What changes were proposed in this pull request? Take the minimum of all watermark exec nodes as the "real" watermark in StreamExecution, rather than picking one arbitrarily. ## How was this patch tested? new unit test You can merge this pull request into a Git repository by running: $ git pull https://github.com/joseph-torres/spark SPARK-22017 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19239.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19239 commit 4a7f53fdab1e5e640e156a4a3d2ba27837195195 Author: Jose Torres <j...@databricks.com> Date: 2017-09-13T21:49:23Z Implement multiple watermark StreamExecution support. commit 9b9cd19106fae9a2de268eb2b559ca1bf159e9c2 Author: Jose Torres <j...@databricks.com> Date: 2017-09-14T18:30:40Z partially fix test commit 6a4c80b696f42a445c7f846fada3f823e04bd3ab Author: Jose Torres <j...@databricks.com> Date: 2017-09-14T21:52:16Z Finish rewriting test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19452: [SPARK-22136][SS] Evaluate one-sided conditions e...
GitHub user joseph-torres opened a pull request: https://github.com/apache/spark/pull/19452 [SPARK-22136][SS] Evaluate one-sided conditions early in stream-stream joins. ## What changes were proposed in this pull request? Evaluate one-sided conditions early in stream-stream joins. This is in addition to normal filter pushdown, because integrating it with the join logic allows it to take place in outer join scenarios. This means that rows which can never satisfy the join condition won't clog up the state. ## How was this patch tested? new unit tests You can merge this pull request into a Git repository by running: $ git pull https://github.com/joseph-torres/spark SPARK-22136 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19452.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19452 commit c90fb3cb1e112edeacb4be2604a7b628f55697f4 Author: Jose Torres <j...@databricks.com> Date: 2017-10-06T20:44:20Z Evaluate one-sided conditions early in stream-stream joins. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19452: [SPARK-22136][SS] Evaluate one-sided conditions e...
Github user joseph-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/19452#discussion_r144396781 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinHelper.scala --- @@ -66,6 +67,60 @@ object StreamingSymmetricHashJoinHelper extends Logging { } } + /** + * Wrapper around various useful splits of the join condition. + * left AND right AND joined is equivalent to full. + * + * Note that left and right do not necessarily contain *all* conjuncts which satisfy + * their condition. Any conjuncts after the first nondeterministic one are treated as + * nondeterministic for purposes of the split. + * + * @param left Deterministic conjuncts which reference only the left side of the join. + * @param right Deterministic conjuncts which reference only the right side of the join. + * @param joined Conjuncts which are in neither left nor right. + * @param full The full join condition. + */ + case class JoinConditionSplitPredicates( +left: Option[Expression], +right: Option[Expression], +joined: Option[Expression], +full: Option[Expression]) {} + + object JoinConditionSplitPredicates extends PredicateHelper { +def apply(condition: Option[Expression], left: SparkPlan, right: SparkPlan): +JoinConditionSplitPredicates = { + // Split the condition into 3 parts: + // * Conjuncts that can be applied to the left before storing. + // * Conjuncts that can be applied to the right before storing. + // * Conjuncts that must be applied to the full row at join time. + // + // Note that the third category includes both conjuncts that reference both sides + // and all nondeterministic conjuncts. Nondeterministic conjuncts can't be shortcutted + // to preserve any stateful semantics they may have. + val (leftCondition, rightCondition, joinedCondition) = { +if (condition.isEmpty) { + (None, None, None) +} else { + val (candidates, containingNonDeterministic) = +splitConjunctivePredicates(condition.get).span(_.deterministic) --- End diff -- It's in a bunch of places in PushDownPredicate, but the reason for it isn't documented in any of those places, so I'm not sure where the right place to point is. I'm adding some documentation here describing why. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19452: [SPARK-22136][SS] Evaluate one-sided conditions e...
Github user joseph-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/19452#discussion_r10608 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala --- @@ -206,10 +213,19 @@ case class StreamingSymmetricHashJoinExec( val updateStartTimeNs = System.nanoTime val joinedRow = new JoinedRow +// Filter the joined rows based on the given condition. +val leftPreJoinFilter = + newPredicate(condition.leftSideOnly.getOrElse(Literal(true)), output).eval _ --- End diff -- Moving it in would require either also passing in the left ++ right input attributes, or passing preJoin and postJoin filters differently. I'm not sure which option is cleaner, so I can make the change you think is best. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19452: [SPARK-22136][SS] Evaluate one-sided conditions e...
Github user joseph-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/19452#discussion_r10844 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala --- @@ -161,6 +164,10 @@ case class StreamingSymmetricHashJoinExec( new SerializableConfiguration(SessionState.newHadoopConf( sparkContext.hadoopConfiguration, sqlContext.conf))) + + val nullLeft = new GenericInternalRow(left.output.map(_.withNullability(true)).length) + val nullRight = new GenericInternalRow(right.output.map(_.withNullability(true)).length) --- End diff -- The problem is that the left joiner has left input attributes, but needs the right null row. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19452: [SPARK-22136][SS] Evaluate one-sided conditions e...
Github user joseph-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/19452#discussion_r144620005 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSymmetricHashJoinHelperSuite.scala --- @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming + +import org.apache.spark.sql.Column +import org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer +import org.apache.spark.sql.catalyst.expressions.AttributeReference +import org.apache.spark.sql.execution.{LeafExecNode, LocalTableScanExec, SparkPlan} +import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec +import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinHelper.JoinConditionSplitPredicates +import org.apache.spark.sql.types.DataTypes + +class StreamingSymmetricHashJoinHelperSuite extends StreamTest { + import org.apache.spark.sql.functions._ + + val attributeA = AttributeReference("a", DataTypes.IntegerType)() + val attributeB = AttributeReference("b", DataTypes.IntegerType)() + val attributeC = AttributeReference("c", DataTypes.IntegerType)() + val attributeD = AttributeReference("d", DataTypes.IntegerType)() + val colA = new Column(attributeA) + val colB = new Column(attributeB) + val colC = new Column(attributeC) + val colD = new Column(attributeD) + + val left = new LocalTableScanExec(Seq(attributeA, attributeB), Seq()) + val right = new LocalTableScanExec(Seq(attributeC, attributeD), Seq()) + + test("empty") { +val split = JoinConditionSplitPredicates(None, left, right) +assert(split.leftSideOnly.isEmpty) +assert(split.rightSideOnly.isEmpty) +assert(split.bothSides.isEmpty) +assert(split.full.isEmpty) + } + + test("only literals") { +// Literal-only conjuncts end up on the left side because that's the first bucket they fit in. +// There's no semantic reason they couldn't be in any bucket. +val predicate = (lit(1) < lit(5) && lit(6) < lit(7) && lit(0) === lit(-1)).expr +val split = JoinConditionSplitPredicates(Some(predicate), left, right) + +assert(split.leftSideOnly.contains(predicate)) +assert(split.rightSideOnly.isEmpty) --- End diff -- I don't want to get into duplicating predicates here for the sake of symmetry. I could move literals to the post-join part maybe? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19056: [SPARK-21765] Check that optimization doesn't aff...
Github user joseph-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/19056#discussion_r135851433 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala --- @@ -126,16 +128,17 @@ class TextSocketSource(host: String, port: Int, includeTimestamp: Boolean, sqlCo batches.slice(sliceStart, sliceEnd) } -import sqlContext.implicits._ -val rawBatch = sqlContext.createDataset(rawList) +val rdd = sqlContext.sparkContext.parallelize(rawList).map( +v => InternalRow(UTF8String.fromString(v._1), v._2.getTime())) +val rawBatch = sqlContext.internalCreateDataFrame(rdd, schema, isStreaming = true) // Underlying MemoryStream has schema (String, Timestamp); strip out the timestamp // if requested. if (includeTimestamp) { rawBatch.toDF("value", "timestamp") } else { // Strip out timestamp - rawBatch.select("_1").toDF("value") + rawBatch.select("value").toDF() --- End diff -- Done. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19056: [SPARK-21765] Check that optimization doesn't aff...
Github user joseph-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/19056#discussion_r135851647 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala --- @@ -126,16 +128,17 @@ class TextSocketSource(host: String, port: Int, includeTimestamp: Boolean, sqlCo batches.slice(sliceStart, sliceEnd) } -import sqlContext.implicits._ -val rawBatch = sqlContext.createDataset(rawList) +val rdd = sqlContext.sparkContext.parallelize(rawList).map( +v => InternalRow(UTF8String.fromString(v._1), v._2.getTime())) +val rawBatch = sqlContext.internalCreateDataFrame(rdd, schema, isStreaming = true) // Underlying MemoryStream has schema (String, Timestamp); strip out the timestamp // if requested. if (includeTimestamp) { rawBatch.toDF("value", "timestamp") --- End diff -- Done. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19056: [SPARK-21765] Check that optimization doesn't aff...
Github user joseph-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/19056#discussion_r135851225 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -39,6 +39,16 @@ abstract class Optimizer(sessionCatalog: SessionCatalog) protected def fixedPoint = FixedPoint(SQLConf.get.optimizerMaxIterations) + override protected def checkInvariants( + result: LogicalPlan, + original: LogicalPlan, + rule: Rule[LogicalPlan]): Unit = { +assert( + result.isStreaming == original.isStreaming, + s"Rule ${rule.ruleName} changed isStreaming from original ${original.isStreaming}:" + --- End diff -- Done. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18973: [SPARK-21765] Set isStreaming on leaf nodes for s...
Github user joseph-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/18973#discussion_r138114144 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala --- @@ -443,7 +444,8 @@ case class Range( end: Long, step: Long, numSlices: Option[Int], -output: Seq[Attribute]) +output: Seq[Attribute], +override val isStreaming: Boolean) --- End diff -- I don't think there's necessarily a reason it shouldn't be able to; streaming sources are free to define getBatch() however they'd like. Right now the only source actually doing that is a fake source in StreamSuite. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19056: [SPARK-21765] Check that optimization doesn't aff...
Github user joseph-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/19056#discussion_r137143373 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala --- @@ -128,8 +128,9 @@ class TextSocketSource(host: String, port: Int, includeTimestamp: Boolean, sqlCo batches.slice(sliceStart, sliceEnd) } -val rdd = sqlContext.sparkContext.parallelize(rawList).map( -v => InternalRow(UTF8String.fromString(v._1), v._2.getTime())) +val rdd = sqlContext.sparkContext. --- End diff -- done --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19056: [SPARK-21765] Check that optimization doesn't aff...
Github user joseph-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/19056#discussion_r137048000 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala --- @@ -130,16 +130,7 @@ class TextSocketSource(host: String, port: Int, includeTimestamp: Boolean, sqlCo val rdd = sqlContext.sparkContext.parallelize(rawList).map( v => InternalRow(UTF8String.fromString(v._1), v._2.getTime())) --- End diff -- Done --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19212: [SPARK-21988] Add default stats to StreamingExecu...
GitHub user joseph-torres opened a pull request: https://github.com/apache/spark/pull/19212 [SPARK-21988] Add default stats to StreamingExecutionRelation. ## What changes were proposed in this pull request? Add default stats to StreamingExecutionRelation. ## How was this patch tested? existing unit tests and an explain() test to be sure You can merge this pull request into a Git repository by running: $ git pull https://github.com/joseph-torres/spark SPARK-21988 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19212.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19212 commit 80d02681ae5290fefe991a7faef7273d79f5f1dd Author: Jose Torres <j...@databricks.com> Date: 2017-09-12T21:44:51Z Add default stats to StreamingExecutionRelation. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.
Github user joseph-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/19327#discussion_r140933095 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala --- @@ -157,11 +164,20 @@ case class StreamingSymmetricHashJoinExec( override def requiredChildDistribution: Seq[Distribution] = ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil - override def output: Seq[Attribute] = left.output ++ right.output + override def output: Seq[Attribute] = joinType match { +case _: InnerLike => left.output ++ right.output +case LeftOuter => left.output ++ right.output.map(_.withNullability(true)) +case RightOuter => left.output.map(_.withNullability(true)) ++ right.output +case _ => + throwBadJoinTypeException() + Seq() + } override def outputPartitioning: Partitioning = joinType match { case _: InnerLike => PartitioningCollection(Seq(left.outputPartitioning, right.outputPartitioning)) +case LeftOuter => PartitioningCollection(Seq(right.outputPartitioning)) --- End diff -- That's what I thought at first, but the non-streaming HashJoin seems to do the partitioning this way. (Or am I misunderstanding what buildSide means in that trait?) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.
Github user joseph-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/19327#discussion_r140935586 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala --- @@ -216,22 +232,70 @@ case class StreamingSymmetricHashJoinExec( } // Filter the joined rows based on the given condition. -val outputFilterFunction = - newPredicate(condition.getOrElse(Literal(true)), left.output ++ right.output).eval _ -val filteredOutputIter = - (leftOutputIter ++ rightOutputIter).filter(outputFilterFunction).map { row => -numOutputRows += 1 -row - } +val outputFilterFunction = newPredicate(condition.getOrElse(Literal(true)), output).eval _ + +val filteredInnerOutputIter = (leftOutputIter ++ rightOutputIter).filter(outputFilterFunction) --- End diff -- I don't think it's correct to filter the output from remove. The query Seq(1, 2, 3).toDF("val1").join(Seq[Int]().toDF("val2"), 'val1 === 'val2 && 'val1 === 0, "left_outer") produces ((1, null), (2, null), (3, null)). Outer joins with watermark range conditions also wouldn't work if we filtered remove output, since the range condition would exclude null values. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.
Github user joseph-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/19327#discussion_r140936872 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala --- @@ -87,70 +87,157 @@ class SymmetricHashJoinStateManager( } /** - * Remove using a predicate on keys. See class docs for more context and implement details. + * Remove using a predicate on keys. + * + * This produces an iterator over the (key, value) pairs satisfying condition(key), where the + * underlying store is updated as a side-effect of producing next. + * + * This implies the iterator must be consumed fully without any other operations on this manager + * or the underlying store being interleaved. */ - def removeByKeyCondition(condition: UnsafeRow => Boolean): Unit = { -val allKeyToNumValues = keyToNumValues.iterator - -while (allKeyToNumValues.hasNext) { - val keyToNumValue = allKeyToNumValues.next - if (condition(keyToNumValue.key)) { -keyToNumValues.remove(keyToNumValue.key) -keyWithIndexToValue.removeAllValues(keyToNumValue.key, keyToNumValue.numValue) + def removeByKeyCondition(condition: UnsafeRow => Boolean): Iterator[UnsafeRowPair] = { +new NextIterator[UnsafeRowPair] { + + private val allKeyToNumValues = keyToNumValues.iterator + + private var currentKeyToNumValue: KeyAndNumValues = null + private var currentValues: Iterator[KeyWithIndexAndValue] = null + + private def currentKey = currentKeyToNumValue.key + + private val reusedPair = new UnsafeRowPair() + + private def getAndRemoveValue() = { +val keyWithIndexAndValue = currentValues.next() +keyWithIndexToValue.remove(currentKey, keyWithIndexAndValue.valueIndex) +reusedPair.withRows(currentKey, keyWithIndexAndValue.value) + } + + override def getNext(): UnsafeRowPair = { +if (currentValues != null && currentValues.hasNext) { + return getAndRemoveValue() +} else { + while (allKeyToNumValues.hasNext) { +currentKeyToNumValue = allKeyToNumValues.next() +if (condition(currentKey)) { + currentValues = keyWithIndexToValue.getAll( +currentKey, currentKeyToNumValue.numValue) + keyToNumValues.remove(currentKey) + + if (currentValues.hasNext) { +return getAndRemoveValue() + } +} + } +} + +finished = true +null } + + override def close: Unit = {} } } /** - * Remove using a predicate on values. See class docs for more context and implementation details. + * Remove using a predicate on values. + * + * At a high level, this produces an iterator over the (key, value) pairs such that value + * satisfies the predicate, where producing an element removes the value from the state store + * and producing all elements with a given key updates it accordingly. + * + * This implies the iterator must be consumed fully without any other operations on this manager + * or the underlying store being interleaved. */ - def removeByValueCondition(condition: UnsafeRow => Boolean): Unit = { -val allKeyToNumValues = keyToNumValues.iterator + def removeByValueCondition(condition: UnsafeRow => Boolean): Iterator[UnsafeRowPair] = { +new NextIterator[UnsafeRowPair] { -while (allKeyToNumValues.hasNext) { - val keyToNumValue = allKeyToNumValues.next - val key = keyToNumValue.key + // Reuse this object to avoid creation+GC overhead. + private val reusedPair = new UnsafeRowPair() - var numValues: Long = keyToNumValue.numValue - var index: Long = 0L - var valueRemoved: Boolean = false - var valueForIndex: UnsafeRow = null + private val allKeyToNumValues = keyToNumValues.iterator - while (index < numValues) { -if (valueForIndex == null) { - valueForIndex = keyWithIndexToValue.get(key, index) + private var currentKey: UnsafeRow = null + private var numValues: Long = 0L + private var index: Long = 0L + private var valueRemoved: Boolean = false + + // Push the data for the current key to the numValues store, and reset the tracking variables + // to their empty state. + private def storeCurrentKey(): Unit = { --- End diff -- Yeah sorry, I meant to go back and fully address this comment the
[GitHub] spark pull request #19327: [SPARK-22136][SS] Implement stream-stream outer j...
Github user joseph-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/19327#discussion_r141984167 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala --- @@ -413,36 +414,103 @@ class UnsupportedOperationsSuite extends SparkFunSuite { batchStreamSupported = false, streamBatchSupported = false) - // Left outer joins: *-stream not allowed + // Left outer joins: *-stream not allowed with default condition --- End diff -- The condition that testBinaryOperationInStreamingPlan uses if left unspecified. I've removed the reference since it wasn't there originally, but let me know if you think some additional comment belongs here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19327: [SPARK-22136][SS] Implement stream-stream outer j...
Github user joseph-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/19327#discussion_r142442657 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala --- @@ -425,6 +426,10 @@ class StreamingJoinSuite extends StreamTest with StateStoreMetricsTest with Befo // Test static comparisons assert(watermarkFrom("cast(leftTime AS LONG) > 10") === Some(1)) + +// Test non-positive results +assert(watermarkFrom("CAST(leftTime AS LONG) > CAST(rightTime AS LONG) - 10") === Some(0)) +assert(watermarkFrom("CAST(leftTime AS LONG) > CAST(rightTime AS LONG) - 100") === Some(-9)) --- End diff -- As mentioned earlier, I'm not sure how to move them; this test method as written relies on building a physical plan. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19327: [WIP] Implement stream-stream outer joins.
Github user joseph-torres commented on the issue: https://github.com/apache/spark/pull/19327 I believe I've addressed all comments. Some refactorings made some comments obsolete, though. I've also fixed 1 bug and 1 test issue causing the 2 unit test failures. There's still the outstanding question of whether allRemovalsTimeMs can/should include time spent outer joining, in addition to test review. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.
Github user joseph-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/19327#discussion_r140968754 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala --- @@ -87,70 +87,157 @@ class SymmetricHashJoinStateManager( } /** - * Remove using a predicate on keys. See class docs for more context and implement details. + * Remove using a predicate on keys. + * + * This produces an iterator over the (key, value) pairs satisfying condition(key), where the + * underlying store is updated as a side-effect of producing next. + * + * This implies the iterator must be consumed fully without any other operations on this manager + * or the underlying store being interleaved. */ - def removeByKeyCondition(condition: UnsafeRow => Boolean): Unit = { -val allKeyToNumValues = keyToNumValues.iterator - -while (allKeyToNumValues.hasNext) { - val keyToNumValue = allKeyToNumValues.next - if (condition(keyToNumValue.key)) { -keyToNumValues.remove(keyToNumValue.key) -keyWithIndexToValue.removeAllValues(keyToNumValue.key, keyToNumValue.numValue) + def removeByKeyCondition(condition: UnsafeRow => Boolean): Iterator[UnsafeRowPair] = { +new NextIterator[UnsafeRowPair] { + + private val allKeyToNumValues = keyToNumValues.iterator + + private var currentKeyToNumValue: KeyAndNumValues = null + private var currentValues: Iterator[KeyWithIndexAndValue] = null + + private def currentKey = currentKeyToNumValue.key + + private val reusedPair = new UnsafeRowPair() + + private def getAndRemoveValue() = { +val keyWithIndexAndValue = currentValues.next() +keyWithIndexToValue.remove(currentKey, keyWithIndexAndValue.valueIndex) +reusedPair.withRows(currentKey, keyWithIndexAndValue.value) + } + + override def getNext(): UnsafeRowPair = { +if (currentValues != null && currentValues.hasNext) { + return getAndRemoveValue() +} else { + while (allKeyToNumValues.hasNext) { +currentKeyToNumValue = allKeyToNumValues.next() +if (condition(currentKey)) { + currentValues = keyWithIndexToValue.getAll( +currentKey, currentKeyToNumValue.numValue) + keyToNumValues.remove(currentKey) + + if (currentValues.hasNext) { +return getAndRemoveValue() + } +} + } +} + +finished = true +null } + + override def close: Unit = {} } } /** - * Remove using a predicate on values. See class docs for more context and implementation details. + * Remove using a predicate on values. + * + * At a high level, this produces an iterator over the (key, value) pairs such that value + * satisfies the predicate, where producing an element removes the value from the state store + * and producing all elements with a given key updates it accordingly. + * + * This implies the iterator must be consumed fully without any other operations on this manager + * or the underlying store being interleaved. */ - def removeByValueCondition(condition: UnsafeRow => Boolean): Unit = { -val allKeyToNumValues = keyToNumValues.iterator + def removeByValueCondition(condition: UnsafeRow => Boolean): Iterator[UnsafeRowPair] = { +new NextIterator[UnsafeRowPair] { -while (allKeyToNumValues.hasNext) { - val keyToNumValue = allKeyToNumValues.next - val key = keyToNumValue.key + // Reuse this object to avoid creation+GC overhead. + private val reusedPair = new UnsafeRowPair() - var numValues: Long = keyToNumValue.numValue - var index: Long = 0L - var valueRemoved: Boolean = false - var valueForIndex: UnsafeRow = null + private val allKeyToNumValues = keyToNumValues.iterator - while (index < numValues) { -if (valueForIndex == null) { - valueForIndex = keyWithIndexToValue.get(key, index) + private var currentKey: UnsafeRow = null + private var numValues: Long = 0L + private var index: Long = 0L + private var valueRemoved: Boolean = false + + // Push the data for the current key to the numValues store, and reset the tracking variables + // to their empty state. + private def storeCurrentKey(): Unit = { +if (valueRemoved) { + if (numValues >= 1) { +key
[GitHub] spark issue #18973: [SPARK-21765] Set isStreaming on leaf nodes for streamin...
Github user joseph-torres commented on the issue: https://github.com/apache/spark/pull/18973 Addressed comments from @tdas --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18973: [SPARK-21765] Set isStreaming on leaf nodes for s...
Github user joseph-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/18973#discussion_r134367543 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala --- @@ -420,8 +420,10 @@ class SQLContext private[sql](val sparkSession: SparkSession) * converted to Catalyst rows. */ private[sql] - def internalCreateDataFrame(catalystRows: RDD[InternalRow], schema: StructType) = { -sparkSession.internalCreateDataFrame(catalystRows, schema) + def internalCreateDataFrame(catalystRows: RDD[InternalRow], --- End diff -- Done. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18973: [SPARK-21765] Set isStreaming on leaf nodes for s...
Github user joseph-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/18973#discussion_r134367548 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala --- @@ -728,7 +729,16 @@ class FakeDefaultSource extends FakeSource { override def getBatch(start: Option[Offset], end: Offset): DataFrame = { val startOffset = start.map(_.asInstanceOf[LongOffset].offset).getOrElse(-1L) + 1 -spark.range(startOffset, end.asInstanceOf[LongOffset].offset + 1).toDF("a") +val ds = new Dataset[java.lang.Long]( --- End diff -- I've tried addressing this a few different ways, and I can't come up with anything cleaner than the current solution. Directly creating a DF doesn't set the isStreaming bit, and a bunch of copying and casting is required to get it set; using LocalRelation requires explicitly handling the encoding of the rows, since LocalRelation requires InternalRow input. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18973: [SPARK-21765] Set isStreaming on leaf nodes for s...
Github user joseph-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/18973#discussion_r134367553 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala --- @@ -118,8 +122,15 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext) batches.slice(sliceStart, sliceEnd) } -logDebug( - s"MemoryBatch [$startOrdinal, $endOrdinal]: ${newBlocks.flatMap(_.collect()).mkString(", ")}") +logDebug({ --- End diff -- Done. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19056: [SPARK-21765] Check that optimization doesn't affect isS...
Github user joseph-torres commented on the issue: https://github.com/apache/spark/pull/19056 Addressed all comments. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19056: [SPARK-21765] Check that optimization doesn't aff...
GitHub user joseph-torres opened a pull request: https://github.com/apache/spark/pull/19056 [SPARK-21765] Check that optimization doesn't affect isStreaming bit. ## What changes were proposed in this pull request? Add an assert in logical plan optimization that the isStreaming bit stays the same, and fix empty relation rules where that wasn't happening. ## How was this patch tested? new and existing unit tests You can merge this pull request into a Git repository by running: $ git pull https://github.com/joseph-torres/spark SPARK-21765-followup Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19056.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19056 commit b83349567760dd0d33388d3fc68d8db1b648e1f1 Author: Jose Torres <joseph-tor...@databricks.com> Date: 2017-08-25T20:48:49Z Check that optimization doesn't affect isStreaming bit. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18973: [SPARK-21765] Set isStreaming on leaf nodes for s...
GitHub user joseph-torres opened a pull request: https://github.com/apache/spark/pull/18973 [SPARK-21765] Set isStreaming on leaf nodes for streaming plans. ## What changes were proposed in this pull request? All logically streaming plans will now have is. This involved adding isStreaming as a case class arg in a few cases, since a node might be logically streaming depending on where it came from. ## How was this patch tested? Existing unit tests - no functional change is intended in this PR. You can merge this pull request into a Git repository by running: $ git pull https://github.com/joseph-torres/spark SPARK-21765 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/18973.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #18973 commit 5115180f7269dab114b19891ae3eacecaed153bf Author: Jose Torres <joseph-tor...@databricks.com> Date: 2017-08-15T03:49:38Z partial - Add isStreaming bit to all LogicalPlan leaves. commit bf61247d2baaa8d32c66df9060c1b88e79eaa824 Author: Tathagata Das <tathagata.das1...@gmail.com> Date: 2017-08-17T00:35:56Z Fixed streaming tests commit 86a3de9a4d8b8414606d13bb34fd758428b83838 Author: Jose Torres <joseph-tor...@databricks.com> Date: 2017-08-17T17:52:52Z remove spurious commenting out --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19461: [SPARK-22230] Swap per-row order in state store r...
GitHub user joseph-torres opened a pull request: https://github.com/apache/spark/pull/19461 [SPARK-22230] Swap per-row order in state store restore. ## What changes were proposed in this pull request? In state store restore, for each row, put the saved state before the row in the iterator instead of after. This fixes an issue where agg(last('attr)) will forever return the last value of 'attr from the first microbatch. ## How was this patch tested? new unit test You can merge this pull request into a Git repository by running: $ git pull https://github.com/joseph-torres/spark SPARK-22230 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19461.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19461 commit 17ef8a843e7dec8da0625caeda213cb1f5c64a4a Author: Jose Torres <j...@databricks.com> Date: 2017-10-09T20:55:19Z Swap per-row order in state store restore. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19452: [SPARK-22136][SS] Evaluate one-sided conditions e...
Github user joseph-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/19452#discussion_r144148695 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinHelper.scala --- @@ -66,6 +67,60 @@ object StreamingSymmetricHashJoinHelper extends Logging { } } + /** + * Wrapper around various useful splits of the join condition. + * left AND right AND joined is equivalent to full. + * + * Note that left and right do not necessarily contain *all* conjuncts which satisfy + * their condition. Any conjuncts after the first nondeterministic one are treated as + * nondeterministic for purposes of the split. + * + * @param left Deterministic conjuncts which reference only the left side of the join. + * @param right Deterministic conjuncts which reference only the right side of the join. + * @param joined Conjuncts which are in neither left nor right. + * @param full The full join condition. + */ + case class JoinConditionSplitPredicates( +left: Option[Expression], +right: Option[Expression], +joined: Option[Expression], +full: Option[Expression]) {} + + object JoinConditionSplitPredicates extends PredicateHelper { +def apply(condition: Option[Expression], left: SparkPlan, right: SparkPlan): +JoinConditionSplitPredicates = { + // Split the condition into 3 parts: + // * Conjuncts that can be applied to the left before storing. + // * Conjuncts that can be applied to the right before storing. + // * Conjuncts that must be applied to the full row at join time. + // + // Note that the third category includes both conjuncts that reference both sides + // and all nondeterministic conjuncts. Nondeterministic conjuncts can't be shortcutted + // to preserve any stateful semantics they may have. + val (leftCondition, rightCondition, joinedCondition) = { +if (condition.isEmpty) { + (None, None, None) +} else { + val (candidates, containingNonDeterministic) = +splitConjunctivePredicates(condition.get).span(_.deterministic) --- End diff -- Nondeterministic conjuncts don't commute across && because Spark does shortcut evaluation. (That is, "udf('val) == 0 && false" will cause udf to be evaluated, while "false && udf('val) == 0" will not.) This behavior is copied from how predicate pushdown handles nondeterminism. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19465: [SPARK-21988][SS]Implement StreamingRelation.computeStat...
Github user joseph-torres commented on the issue: https://github.com/apache/spark/pull/19465 LGTM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19926: [SPARK-22733] Split StreamExecution into MicroBat...
Github user joseph-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/19926#discussion_r156471355 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala --- @@ -783,29 +430,29 @@ class StreamExecution( } while (notDone) { - awaitBatchLock.lock() + awaitProgressLock.lock() try { -awaitBatchLockCondition.await(100, TimeUnit.MILLISECONDS) +awaitProgressLockCondition.await(100, TimeUnit.MILLISECONDS) if (streamDeathCause != null) { throw streamDeathCause } } finally { -awaitBatchLock.unlock() +awaitProgressLock.unlock() } } logDebug(s"Unblocked at $newOffset for $source") } /** A flag to indicate that a batch has completed with no new data available. */ - @volatile private var noNewData = false + @volatile protected var noNewData = false --- End diff -- Yes. The flag is really just a test harness; it's only used in processAllAvailable, so tests can block until there's a batch (or now epoch) that doesn't contain any data. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19926: [SPARK-22733] Split StreamExecution into MicroBatchExecu...
Github user joseph-torres commented on the issue: https://github.com/apache/spark/pull/19926 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19925: [SPARK-22732] Add Structured Streaming APIs to DataSourc...
Github user joseph-torres commented on the issue: https://github.com/apache/spark/pull/19925 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19926: [SPARK-22733] Split StreamExecution into MicroBat...
Github user joseph-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/19926#discussion_r156532669 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala --- @@ -237,7 +237,7 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo "is not supported in streaming DataFrames/Datasets and will be disabled.") } -new StreamingQueryWrapper(new StreamExecution( +new StreamingQueryWrapper(new MicroBatchExecution( --- End diff -- My current thinking is to have it be a new trigger type. It can't really be a config, because continuous processing (at least in the initial implementation) won't support all operators. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19926: [SPARK-22733] Split StreamExecution into MicroBat...
Github user joseph-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/19926#discussion_r156532817 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala --- @@ -447,296 +384,6 @@ class StreamExecution( } } - /** - * Populate the start offsets to start the execution at the current offsets stored in the sink - * (i.e. avoid reprocessing data that we have already processed). This function must be called - * before any processing occurs and will populate the following fields: - * - currentBatchId - * - committedOffsets - * - availableOffsets - * The basic structure of this method is as follows: - * - * Identify (from the offset log) the offsets used to run the last batch - * IF last batch exists THEN - *Set the next batch to be executed as the last recovered batch - *Check the commit log to see which batch was committed last - *IF the last batch was committed THEN - * Call getBatch using the last batch start and end offsets - * // above line is needed since some sources assume last batch always re-executes - * Setup for a new batch i.e., start = last batch end, and identify new end - *DONE - * ELSE - *Identify a brand new batch - * DONE - */ - private def populateStartOffsets(sparkSessionToRunBatches: SparkSession): Unit = { -offsetLog.getLatest() match { --- End diff -- Sure, we could do that. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19925: [SPARK-22732] Add Structured Streaming APIs to Da...
Github user joseph-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/19925#discussion_r156539696 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala --- @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import scala.collection.JavaConverters._ + +import org.json4s.DefaultFormats +import org.json4s.jackson.Serialization + +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.execution.streaming._ +import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, DataSourceV2, DataSourceV2Options} +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.types.{LongType, StructField, StructType, TimestampType} + +object ContinuousRateStreamSource { + val NUM_PARTITIONS = "numPartitions" + val ROWS_PER_SECOND = "rowsPerSecond" +} + +case class ContinuousRateStreamOffset(partitionToStartValue: Map[Int, Long]) extends Offset { + implicit val defaultFormats: DefaultFormats = DefaultFormats + override val json = Serialization.write(partitionToStartValue) +} + +case class ContinuousRateStreamPartitionOffset(partition: Int, start: Long) extends PartitionOffset + +class ContinuousRateStreamReader(options: DataSourceV2Options) + extends ContinuousReader { + implicit val defaultFormats: DefaultFormats = DefaultFormats + + val numPartitions = options.get(ContinuousRateStreamSource.NUM_PARTITIONS).orElse("5").toInt + val rowsPerSecond = options.get(ContinuousRateStreamSource.ROWS_PER_SECOND).orElse("6").toLong + + override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = { +assert(offsets.length == numPartitions) +val tuples = offsets.map { + case ContinuousRateStreamPartitionOffset(p, s) => p -> s +} +ContinuousRateStreamOffset(Map(tuples: _*)) + } + + override def deserializeOffset(json: String): Offset = { +ContinuousRateStreamOffset(Serialization.read[Map[Int, Long]](json)) + } + + override def readSchema(): StructType = { +StructType( +StructField("timestamp", TimestampType, false) :: +StructField("value", LongType, false) :: Nil) + } + + private var offset: java.util.Optional[Offset] = _ + + override def setOffset(offset: java.util.Optional[Offset]): Unit = { +this.offset = offset + } + + override def getStartOffset(): Offset = offset.get() + + override def createReadTasks(): java.util.List[ReadTask[Row]] = { +val partitionStartMap = Option(offset.orElse(null)).map { + case o: ContinuousRateStreamOffset => o.partitionToStartValue + case s: SerializedOffset => Serialization.read[Map[Int, Long]](s.json) + case _ => throw new IllegalArgumentException("invalid offset type for ContinuousRateSource") +} +if (partitionStartMap.exists(_.keySet.size > numPartitions)) { + throw new IllegalArgumentException("Start offset contained too many partitions.") +} +val perPartitionRate = rowsPerSecond.toDouble / numPartitions.toDouble + +Range(0, numPartitions).map { n => + // If the offset doesn't have a value for this partition, start from the beginning. + val start = partitionStartMap.flatMap(_.get(n)).getOrElse(0L + n) + // Have each partition advance by numPartitions each row, with starting points staggered + // by their partition index. + RateStreamReadTask(start, n, numPartitions, perPartitionRate) +.asInstanceOf[ReadTask[Row]] +}.asJava + } +
[GitHub] spark pull request #19925: [SPARK-22732] Add Structured Streaming APIs to Da...
Github user joseph-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/19925#discussion_r156540689 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala --- @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import scala.collection.JavaConverters._ + +import org.json4s.DefaultFormats +import org.json4s.jackson.Serialization + +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.execution.streaming._ +import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, DataSourceV2, DataSourceV2Options} +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.types.{LongType, StructField, StructType, TimestampType} + +object ContinuousRateStreamSource { + val NUM_PARTITIONS = "numPartitions" + val ROWS_PER_SECOND = "rowsPerSecond" +} + +case class ContinuousRateStreamOffset(partitionToStartValue: Map[Int, Long]) extends Offset { + implicit val defaultFormats: DefaultFormats = DefaultFormats + override val json = Serialization.write(partitionToStartValue) +} + +case class ContinuousRateStreamPartitionOffset(partition: Int, start: Long) extends PartitionOffset + +class ContinuousRateStreamReader(options: DataSourceV2Options) + extends ContinuousReader { + implicit val defaultFormats: DefaultFormats = DefaultFormats + + val numPartitions = options.get(ContinuousRateStreamSource.NUM_PARTITIONS).orElse("5").toInt + val rowsPerSecond = options.get(ContinuousRateStreamSource.ROWS_PER_SECOND).orElse("6").toLong + + override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = { +assert(offsets.length == numPartitions) +val tuples = offsets.map { + case ContinuousRateStreamPartitionOffset(p, s) => p -> s +} +ContinuousRateStreamOffset(Map(tuples: _*)) + } + + override def deserializeOffset(json: String): Offset = { +ContinuousRateStreamOffset(Serialization.read[Map[Int, Long]](json)) + } + + override def readSchema(): StructType = { +StructType( +StructField("timestamp", TimestampType, false) :: +StructField("value", LongType, false) :: Nil) + } + + private var offset: java.util.Optional[Offset] = _ + + override def setOffset(offset: java.util.Optional[Offset]): Unit = { +this.offset = offset + } + + override def getStartOffset(): Offset = offset.get() + + override def createReadTasks(): java.util.List[ReadTask[Row]] = { +val partitionStartMap = Option(offset.orElse(null)).map { + case o: ContinuousRateStreamOffset => o.partitionToStartValue + case s: SerializedOffset => Serialization.read[Map[Int, Long]](s.json) + case _ => throw new IllegalArgumentException("invalid offset type for ContinuousRateSource") +} +if (partitionStartMap.exists(_.keySet.size > numPartitions)) { + throw new IllegalArgumentException("Start offset contained too many partitions.") +} +val perPartitionRate = rowsPerSecond.toDouble / numPartitions.toDouble + +Range(0, numPartitions).map { n => + // If the offset doesn't have a value for this partition, start from the beginning. + val start = partitionStartMap.flatMap(_.get(n)).getOrElse(0L + n) + // Have each partition advance by numPartitions each row, with starting points staggered + // by their partition index. + RateStreamReadTask(start, n, numPartitions, perPartitionRate) +.asInstanceOf[ReadTask[Row]] +}.asJava + } +
[GitHub] spark pull request #19925: [SPARK-22732] Add Structured Streaming APIs to Da...
Github user joseph-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/19925#discussion_r156542366 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala --- @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import scala.collection.JavaConverters._ + +import org.json4s.DefaultFormats +import org.json4s.jackson.Serialization + +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.execution.streaming._ +import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, DataSourceV2, DataSourceV2Options} +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.types.{LongType, StructField, StructType, TimestampType} + +object ContinuousRateStreamSource { + val NUM_PARTITIONS = "numPartitions" + val ROWS_PER_SECOND = "rowsPerSecond" +} + +case class ContinuousRateStreamOffset(partitionToStartValue: Map[Int, Long]) extends Offset { + implicit val defaultFormats: DefaultFormats = DefaultFormats + override val json = Serialization.write(partitionToStartValue) +} + +case class ContinuousRateStreamPartitionOffset(partition: Int, start: Long) extends PartitionOffset + +class ContinuousRateStreamReader(options: DataSourceV2Options) + extends ContinuousReader { + implicit val defaultFormats: DefaultFormats = DefaultFormats + + val numPartitions = options.get(ContinuousRateStreamSource.NUM_PARTITIONS).orElse("5").toInt + val rowsPerSecond = options.get(ContinuousRateStreamSource.ROWS_PER_SECOND).orElse("6").toLong + + override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = { +assert(offsets.length == numPartitions) +val tuples = offsets.map { + case ContinuousRateStreamPartitionOffset(p, s) => p -> s +} +ContinuousRateStreamOffset(Map(tuples: _*)) + } + + override def deserializeOffset(json: String): Offset = { +ContinuousRateStreamOffset(Serialization.read[Map[Int, Long]](json)) + } + + override def readSchema(): StructType = { +StructType( +StructField("timestamp", TimestampType, false) :: +StructField("value", LongType, false) :: Nil) + } + + private var offset: java.util.Optional[Offset] = _ + + override def setOffset(offset: java.util.Optional[Offset]): Unit = { +this.offset = offset + } + + override def getStartOffset(): Offset = offset.get() + + override def createReadTasks(): java.util.List[ReadTask[Row]] = { +val partitionStartMap = Option(offset.orElse(null)).map { + case o: ContinuousRateStreamOffset => o.partitionToStartValue + case s: SerializedOffset => Serialization.read[Map[Int, Long]](s.json) + case _ => throw new IllegalArgumentException("invalid offset type for ContinuousRateSource") +} +if (partitionStartMap.exists(_.keySet.size > numPartitions)) { + throw new IllegalArgumentException("Start offset contained too many partitions.") +} +val perPartitionRate = rowsPerSecond.toDouble / numPartitions.toDouble + +Range(0, numPartitions).map { n => + // If the offset doesn't have a value for this partition, start from the beginning. + val start = partitionStartMap.flatMap(_.get(n)).getOrElse(0L + n) + // Have each partition advance by numPartitions each row, with starting points staggered + // by their partition index. + RateStreamReadTask(start, n, numPartitions, perPartitionRate) +.asInstanceOf[ReadTask[Row]] +}.asJava + } +
[GitHub] spark pull request #19926: [SPARK-22733] Split StreamExecution into MicroBat...
Github user joseph-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/19926#discussion_r157047178 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala --- @@ -0,0 +1,407 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import scala.collection.mutable.{ArrayBuffer, Map => MutableMap} + +import org.apache.spark.sql.{Dataset, SparkSession} +import org.apache.spark.sql.catalyst.encoders.RowEncoder +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp} +import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} +import org.apache.spark.sql.execution.SQLExecution +import org.apache.spark.sql.sources.v2.reader.Offset +import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger} +import org.apache.spark.util.{Clock, Utils} + +class MicroBatchExecution( +sparkSession: SparkSession, +name: String, +checkpointRoot: String, +analyzedPlan: LogicalPlan, +sink: Sink, +trigger: Trigger, +triggerClock: Clock, +outputMode: OutputMode, +deleteCheckpointOnStop: Boolean) + extends StreamExecution( +sparkSession, name, checkpointRoot, analyzedPlan, sink, +trigger, triggerClock, outputMode, deleteCheckpointOnStop) { + + private val triggerExecutor = trigger match { +case t: ProcessingTime => ProcessingTimeExecutor(t, triggerClock) +case OneTimeTrigger => OneTimeExecutor() +case _ => throw new IllegalStateException(s"Unknown type of trigger: $trigger") + } + + override lazy val logicalPlan: LogicalPlan = { +assert(queryExecutionThread eq Thread.currentThread, + "logicalPlan must be initialized in QueryExecutionThread " + +s"but the current thread was ${Thread.currentThread}") +var nextSourceId = 0L +val toExecutionRelationMap = MutableMap[StreamingRelation, StreamingExecutionRelation]() +val _logicalPlan = analyzedPlan.transform { + case streamingRelation@StreamingRelation(dataSource, _, output) => +toExecutionRelationMap.getOrElseUpdate(streamingRelation, { + // Materialize source to avoid creating it in every batch + val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId" + val source = dataSource.createSource(metadataPath) + nextSourceId += 1 + // We still need to use the previous `output` instead of `source.schema` as attributes in + // "df.logicalPlan" has already used attributes of the previous `output`. + StreamingExecutionRelation(source, output)(sparkSession) +}) +} +sources = _logicalPlan.collect { case s: StreamingExecutionRelation => s.source } +uniqueSources = sources.distinct +_logicalPlan + } + + /** + * Repeatedly attempts to run batches as data arrives. + */ + protected def runActivatedStream(sparkSessionForStream: SparkSession): Unit = { +triggerExecutor.execute(() => { + startTrigger() + + if (isActive) { +reportTimeTaken("triggerExecution") { + if (currentBatchId < 0) { +// We'll do this initialization only once +populateStartOffsets(sparkSessionForStream) + sparkSession.sparkContext.setJobDescription(getBatchDescriptionString) +logDebug(s"Stream running from $committedOffsets to $availableOffsets") + } else { +constructNextBatch() + } + if (dataAvailable) { +currentStatus = currentStatus.copy(isDataAvailable = true) +updateStatusMessage("Processing new
[GitHub] spark pull request #20012: [SPARK-22824] Restore old offset for binary compa...
GitHub user joseph-torres opened a pull request: https://github.com/apache/spark/pull/20012 [SPARK-22824] Restore old offset for binary compatibility ## What changes were proposed in this pull request? Some users depend on source compatibility with the org.apache.spark.sql.execution.streaming.Offset class. Although this is not a stable interface, we can keep it in place for now to simplify upgrades to 2.3. You can merge this pull request into a Git repository by running: $ git pull https://github.com/joseph-torres/spark binary-compat Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20012.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20012 commit 7d1df9855dee8b6bce5ebed136b8da4275f178c7 Author: Jose Torres <j...@databricks.com> Date: 2017-12-18T21:34:50Z Restore old offset for binary compatibility --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...
GitHub user joseph-torres opened a pull request: https://github.com/apache/spark/pull/19984 [SPARK-22789] Map-only continuous processing execution ## What changes were proposed in this pull request? Basic continuous execution, supporting map/flatMap/filter, with commits and advancement through RPC. ## How was this patch tested? new unit-ish tests (exercising execution end to end) You can merge this pull request into a Git repository by running: $ git pull https://github.com/joseph-torres/spark continuous-impl Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19984.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19984 commit d6bea84447d910e79d5926972d87a80bc5dc2e2e Author: Jose Torres <j...@databricks.com> Date: 2017-12-07T22:08:28Z Refactor StreamExecution into a parent class so continuous processing can extend it commit df6b8861173d1e7853952c8f3ffe504975efe204 Author: Jose Torres <j...@databricks.com> Date: 2017-12-12T19:31:28Z address fmt commit 6f0ce6b1cf1abf602c2b02ce6d31f46f8fa71b7c Author: Jose Torres <j...@databricks.com> Date: 2017-12-13T00:09:48Z slight changes commit 2b360ab49bcab3c73ea85ce62202e40e950931ef Author: Jose Torres <j...@databricks.com> Date: 2017-12-13T00:10:34Z rm spurious space commit 1b19f1cef17e7324997649ad8c5f97887912 Author: Jose Torres <j...@databricks.com> Date: 2017-12-13T00:35:30Z fix compile commit 96eba13be9764e63f3d1375d7b51dbfd0675aa98 Author: Jose Torres <j...@databricks.com> Date: 2017-12-11T20:48:20Z harness commit 2d5efadb9e7662363e3e4a3c66e0f5f73e4935ef Author: Jose Torres <j...@databricks.com> Date: 2017-12-11T21:18:25Z awaitEpoch impl commit 578bbb7eb0725b795ac65d1beda436515f4f4eba Author: Jose Torres <j...@databricks.com> Date: 2017-12-11T21:46:09Z move local[10] to only continuous suite commit 9051eff6c88838ac61ab45763ed84d593e2d4837 Author: Jose Torres <j...@databricks.com> Date: 2017-12-11T21:49:55Z repeatedly restart commit 60fa4477591cc264b9ea253f64065d762ce3f96f Author: Jose Torres <j...@databricks.com> Date: 2017-12-11T22:02:52Z fix some simple TODOs commit ea8e76ec75752d134433730ee1a007cce1fdcfe8 Author: Jose Torres <j...@databricks.com> Date: 2017-12-11T22:11:18Z use runId instead of queryId for endpoint name commit d0f3cc7701d9eb3e7df571561e751f03c0537f3a Author: Jose Torres <j...@databricks.com> Date: 2017-12-11T22:19:03Z more simple todos commit ba9dbaa1be2f54827a42f3177669082e7d1f99e2 Author: Jose Torres <j...@databricks.com> Date: 2017-12-11T22:27:12Z remove old state commit 2cd005f4685e492ae78d6b9c579c80c2370d2f14 Author: Jose Torres <j...@databricks.com> Date: 2017-12-11T22:35:51Z remove clean shutdown workaround in StreamTest commit a7fa31fb5375074d888bd0a94e317ad3f1692e5a Author: Jose Torres <j...@databricks.com> Date: 2017-12-11T22:50:09Z update ContinuousExecution docs commit f687432a58acf7337885edfc01adc94188d174d8 Author: Jose Torres <j...@databricks.com> Date: 2017-12-11T22:59:14Z add comments to EpochCoordinator commit 987b011ee78292c3379559910ebe101daf4f9450 Author: Jose Torres <j...@databricks.com> Date: 2017-12-12T00:02:54Z change offset semantic to end of previous epoch commit 5494fc50ef99b3e584c287b03eaa32b30657a5ce Author: Jose Torres <j...@databricks.com> Date: 2017-12-12T00:18:40Z document EpochCoordinator commit d6ef404b85fa6977b5f38a853dca11de5189b3f9 Author: Jose Torres <j...@databricks.com> Date: 2017-12-12T02:06:44Z simplify epoch handling commit 647bd2745c1c0842002d4f71b61aa34beb0f8b29 Author: Jose Torres <j...@databricks.com> Date: 2017-12-12T19:17:58Z stress tests commit 053a9f349a4829433a495aa5989f1ca1c8a3256e Author: Jose Torres <j...@databricks.com> Date: 2017-12-12T20:17:22Z add minBatchesToRetain commit 7072d21444388fe167fa7e3475b3e95ec9923d5e Author: Jose Torres <j...@databricks.com> Date: 2017-12-12T20:43:33Z add confs commit 4083a8f5c6b6ef298726234d54f23a90e971e77e Author: Jose Torres <j...@databricks.com> Date: 2017-12-12T21:10:33Z latency suite not meaningful here commit 41d391f2027a4e8b3730d15cea7b7fbcdcec27de Author: Jose Torres <j...@databricks.com> Date: 2017-12-13T00:04:07Z more stress::q commit 402cfa3b10dfb0f37ce8d94336be3b3c01fe9f90 Author: Jose Torres <j...@databricks.com> Date: 2017-12-13T18:55:23Z use temp dir commit e4a1bc19db9ea0233879d270e725ed58d95a34ad Author: Jose Torres <j...@databricks.com> Date: 2017-12-14T19:37:36Z fix against rebase commit 8887b3c92afe8bb1659f600785af5d97f085f2bb Author: Jose Torres <j...@
[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...
Github user joseph-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/19984#discussion_r157278184 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -1035,6 +1035,22 @@ object SQLConf { .booleanConf .createWithDefault(true) + val CONTINUOUS_STREAMING_EXECUTOR_QUEUE_SIZE = +buildConf("spark.sql.streaming.continuous.executorQueueSize") +.internal() +.doc("The size (measured in number of rows) of the queue used in continuous execution to" + + " buffer the results of a ContinuousDataReader.") +.intConf --- End diff -- Should it be? I can't imagine anything close to MAX_INT being a reasonable value here. Will it be hard to migrate to a long if we later discover it's needed? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19925: [SPARK-22732] Add Structured Streaming APIs to Da...
Github user joseph-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/19925#discussion_r156741131 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala --- @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import scala.collection.JavaConverters._ + +import org.json4s.DefaultFormats +import org.json4s.jackson.Serialization + +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.execution.streaming._ +import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, DataSourceV2, DataSourceV2Options} +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.types.{LongType, StructField, StructType, TimestampType} + +object ContinuousRateStreamSource { + val NUM_PARTITIONS = "numPartitions" + val ROWS_PER_SECOND = "rowsPerSecond" +} + +case class ContinuousRateStreamOffset(partitionToStartValue: Map[Int, Long]) extends Offset { + implicit val defaultFormats: DefaultFormats = DefaultFormats + override val json = Serialization.write(partitionToStartValue) +} + +case class ContinuousRateStreamPartitionOffset(partition: Int, start: Long) extends PartitionOffset + +class ContinuousRateStreamReader(options: DataSourceV2Options) + extends ContinuousReader { + implicit val defaultFormats: DefaultFormats = DefaultFormats + + val numPartitions = options.get(ContinuousRateStreamSource.NUM_PARTITIONS).orElse("5").toInt + val rowsPerSecond = options.get(ContinuousRateStreamSource.ROWS_PER_SECOND).orElse("6").toLong + + override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = { +assert(offsets.length == numPartitions) +val tuples = offsets.map { + case ContinuousRateStreamPartitionOffset(p, s) => p -> s +} +ContinuousRateStreamOffset(Map(tuples: _*)) + } + + override def deserializeOffset(json: String): Offset = { +ContinuousRateStreamOffset(Serialization.read[Map[Int, Long]](json)) + } + + override def readSchema(): StructType = { +StructType( +StructField("timestamp", TimestampType, false) :: +StructField("value", LongType, false) :: Nil) + } + + private var offset: java.util.Optional[Offset] = _ + + override def setOffset(offset: java.util.Optional[Offset]): Unit = { +this.offset = offset + } + + override def getStartOffset(): Offset = offset.get() + + override def createReadTasks(): java.util.List[ReadTask[Row]] = { +val partitionStartMap = Option(offset.orElse(null)).map { + case o: ContinuousRateStreamOffset => o.partitionToStartValue + case s: SerializedOffset => Serialization.read[Map[Int, Long]](s.json) + case _ => throw new IllegalArgumentException("invalid offset type for ContinuousRateSource") +} +if (partitionStartMap.exists(_.keySet.size > numPartitions)) { + throw new IllegalArgumentException("Start offset contained too many partitions.") +} +val perPartitionRate = rowsPerSecond.toDouble / numPartitions.toDouble + +Range(0, numPartitions).map { n => + // If the offset doesn't have a value for this partition, start from the beginning. + val start = partitionStartMap.flatMap(_.get(n)).getOrElse(0L + n) + // Have each partition advance by numPartitions each row, with starting points staggered + // by their partition index. + RateStreamReadTask(start, n, numPartitions, perPartitionRate) +.asInstanceOf[ReadTask[Row]] +}.asJava + } +
[GitHub] spark pull request #19925: [SPARK-22732] Add Structured Streaming APIs to Da...
Github user joseph-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/19925#discussion_r156740666 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memoryV2.scala --- @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import javax.annotation.concurrent.GuardedBy + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer +import scala.util.control.NonFatal + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.streaming.InternalOutputModes.{Append, Complete, Update} +import org.apache.spark.sql.sources.v2.{ContinuousWriteSupport, DataSourceV2, DataSourceV2Options, MicroBatchWriteSupport} +import org.apache.spark.sql.sources.v2.writer._ +import org.apache.spark.sql.streaming.OutputMode +import org.apache.spark.sql.types.StructType + +/** + * A sink that stores the results in memory. This [[Sink]] is primarily intended for use in unit + * tests and does not provide durability. + */ +class MemorySinkV2 extends DataSourceV2 + with MicroBatchWriteSupport with ContinuousWriteSupport with Logging { + + override def createMicroBatchWriter( + queryId: String, + batchId: Long, + schema: StructType, + mode: OutputMode, + options: DataSourceV2Options): java.util.Optional[DataSourceV2Writer] = { +java.util.Optional.of(new MemoryWriter(this, batchId, mode)) + } + + override def createContinuousWriter( + queryId: String, + schema: StructType, + mode: OutputMode, + options: DataSourceV2Options): java.util.Optional[ContinuousWriter] = { +java.util.Optional.of(new ContinuousMemoryWriter(this, mode)) + } + + private case class AddedData(batchId: Long, data: Array[Row]) + + /** An order list of batches that have been written to this [[Sink]]. */ + @GuardedBy("this") + private val batches = new ArrayBuffer[AddedData]() + + /** Returns all rows that are stored in this [[Sink]]. */ + def allData: Seq[Row] = synchronized { +batches.flatMap(_.data) + } + + def latestBatchId: Option[Long] = synchronized { +batches.lastOption.map(_.batchId) + } + + def latestBatchData: Seq[Row] = synchronized { +batches.lastOption.toSeq.flatten(_.data) + } + + def toDebugString: String = synchronized { +batches.map { case AddedData(batchId, data) => + val dataStr = try data.mkString(" ") catch { +case NonFatal(e) => "[Error converting to string]" + } + s"$batchId: $dataStr" +}.mkString("\n") + } + + def write(batchId: Long, outputMode: OutputMode, newRows: Array[Row]): Unit = { +val notCommitted = synchronized { + latestBatchId.isEmpty || batchId > latestBatchId.get +} +if (notCommitted) { + logDebug(s"Committing batch $batchId to $this") + outputMode match { +case Append | Update => + val rows = AddedData(batchId, newRows) + synchronized { batches += rows } + +case Complete => + val rows = AddedData(batchId, newRows) + synchronized { +batches.clear() +batches += rows + } + +case _ => + throw new IllegalArgumentException( +s"Output mode $outputMode is not supported by MemorySink") + } +} else { + logDebug(s"Skipping already committed batch: $batchId") +} + } + + def clear(): Unit = synchronized { +batches.clear() + } + + override def toString(): String = "MemorySink" +} + +case cl
[GitHub] spark pull request #19926: [SPARK-22733] Split StreamExecution into MicroBat...
Github user joseph-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/19926#discussion_r156471697 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala --- @@ -237,7 +237,7 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo "is not supported in streaming DataFrames/Datasets and will be disabled.") } -new StreamingQueryWrapper(new StreamExecution( +new StreamingQueryWrapper(new MicroBatchExecution( --- End diff -- Sorry, I'm not sure what you have in mind here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19926: [SPARK-22733] Split StreamExecution into MicroBat...
Github user joseph-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/19926#discussion_r156470973 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala --- @@ -447,296 +384,6 @@ class StreamExecution( } } - /** - * Populate the start offsets to start the execution at the current offsets stored in the sink - * (i.e. avoid reprocessing data that we have already processed). This function must be called - * before any processing occurs and will populate the following fields: - * - currentBatchId - * - committedOffsets - * - availableOffsets - * The basic structure of this method is as follows: - * - * Identify (from the offset log) the offsets used to run the last batch - * IF last batch exists THEN - *Set the next batch to be executed as the last recovered batch - *Check the commit log to see which batch was committed last - *IF the last batch was committed THEN - * Call getBatch using the last batch start and end offsets - * // above line is needed since some sources assume last batch always re-executes - * Setup for a new batch i.e., start = last batch end, and identify new end - *DONE - * ELSE - *Identify a brand new batch - * DONE - */ - private def populateStartOffsets(sparkSessionToRunBatches: SparkSession): Unit = { -offsetLog.getLatest() match { --- End diff -- The offset log right now has a strict schema that commit information wouldn't fit in. I was planning to keep both logs in the continuous implementation. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19926: [SPARK-22733] Split StreamExecution into MicroBat...
Github user joseph-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/19926#discussion_r156468624 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala --- @@ -71,27 +68,29 @@ class StreamExecution( import org.apache.spark.sql.streaming.StreamingQueryListener._ - private val pollingDelayMs = sparkSession.sessionState.conf.streamingPollingDelay + protected val pollingDelayMs: Long = sparkSession.sessionState.conf.streamingPollingDelay - private val minBatchesToRetain = sparkSession.sessionState.conf.minBatchesToRetain + protected val minBatchesToRetain: Int = sparkSession.sessionState.conf.minBatchesToRetain --- End diff -- We may want to tweak the variable name, but continuous processing will still need to know how long it should retain commit and offset log entries. Unfortunately we're stuck with the config name, and I don't think it makes sense to introduce a second parallel one doing the same thing. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19925: [SPARK-22732] Add Structured Streaming APIs to Da...
GitHub user joseph-torres opened a pull request: https://github.com/apache/spark/pull/19925 [SPARK-22732] Add Structured Streaming APIs to DataSourceV2 ## What changes were proposed in this pull request? This PR provides DataSourceV2 API support for structured streaming, including new pieces needed to support continuous processing [SPARK-20928]. High level summary: - DataSourceV2 includes new mixins to support micro-batch and continuous reads and writes. For reads, we accept an optional user specified schema rather than using the ReadSupportWithSchema model, because doing so would severely complicate the interface. - DataSourceV2Reader includes new interfaces to read a specific microbatch or read continuously from a given offset. These follow the same setter pattern as the existing Supports* mixins so that they can work with SupportsScanUnsafeRow. - DataReader (the per-partition reader) has a new subinterface ContinuousDataReader only for continuous processing. This reader has a special method to check progress, and next() blocks for new input rather than returning false. - Offset, an abstract representation of position in a streaming query, is ported to the public API. (Each type of reader will define its own Offset implementation.) - DataSourceV2Writer has a new subinterface ContinuousWriter only for continuous processing. Commits to this interface come tagged with an epoch number, as the execution engine will continue to produce new epoch commits as the task continues indefinitely. Note that this PR does not propose to change the existing DataSourceV2 batch API, or deprecate the existing streaming source/sink internal APIs in spark.sql.execution.streaming. ## How was this patch tested? Toy implementations of the new interfaces with unit tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/joseph-torres/spark continuous-api Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19925.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19925 commit daa3a78ad4dd7ecfc73f5b1dd050388c07b42771 Author: Jose Torres <j...@databricks.com> Date: 2017-12-05T18:48:20Z add tests commit edae89508ec2bf02fba00a264cb774b0d60fb068 Author: Jose Torres <j...@databricks.com> Date: 2017-12-05T19:35:36Z writer impl commit 9b28c524b343018d20d2d8d3c9ed4d3c530c413f Author: Jose Torres <j...@databricks.com> Date: 2017-12-05T19:37:24Z rm useless writer commit 7ceda9d63b9914cfd275fc4240fa9c696afa05d1 Author: Jose Torres <j...@databricks.com> Date: 2017-12-05T21:02:32Z rm weird docs commit ff7be6914560968af7f2179c3704446c771fad52 Author: Jose Torres <j...@databricks.com> Date: 2017-12-05T21:59:50Z shuffle around public interfaces commit 4ae516a61af903c37b748a3941c2472d20776ce4 Author: Jose Torres <j...@databricks.com> Date: 2017-12-05T22:02:01Z fix imports commit a8ff2ee9eeb992f6c0806cb2b4f33b976ef51cf5 Author: Jose Torres <j...@databricks.com> Date: 2017-12-05T22:40:15Z put deserialize in reader so we don't have to port SerializedOffset commit 5096d3d551aa4479bfb112b286683e28ec578f3c Author: Jose Torres <j...@databricks.com> Date: 2017-12-05T23:51:08Z off by one errors grr commit da00f6b5ddac8bd6025076a67fd4716d9d070bf7 Author: Jose Torres <j...@databricks.com> Date: 2017-12-05T23:55:58Z document right semantics commit 1526f433837de78f59009b6632b6920de38bb1b0 Author: Jose Torres <j...@databricks.com> Date: 2017-12-06T00:08:54Z document checkpoint location commit 33b619ca4f9aa1a82e3830c6e485b8298ca9ff50 Author: Jose Torres <j...@databricks.com> Date: 2017-12-06T00:43:36Z add getStart to continuous and clarify semantics commit 083b04004f58358b3f6e4c82b4690ca5cf2da764 Author: Jose Torres <j...@databricks.com> Date: 2017-12-06T17:23:34Z cleanup offset set/get docs commit 4d6244d2ae431f6043de97f322ce1c33090c Author: Jose Torres <j...@databricks.com> Date: 2017-12-06T17:32:45Z cleanup reader docs commit 5f9df4f1b54cbd0570d0df5567c42ac2575009a5 Author: Jose Torres <j...@databricks.com> Date: 2017-12-06T18:06:44Z explain getOffset commit a2323e95ff2d407877ded07b7537bac5b63dda8f Author: Jose Torres <j...@databricks.com> Date: 2017-12-06T21:17:43Z fix fmt commit b80c75cd698cbe4840445efb78a662f02f355a99 Author: Jose Torres <j...@databricks.com> Date: 2017-12-06T21:24:35Z fix doc commit 03bd69da4b0450e5fec88f4196998e3075e98edc Author: Jose Torres <j...@databricks.com> Date: 2017-12-06T21:39:20Z note interfaces are temporary commit c7bc6a37914312666259bb9724aa710392
[GitHub] spark pull request #19926: [SPARK-22733] Split StreamExecution into MicroBat...
GitHub user joseph-torres opened a pull request: https://github.com/apache/spark/pull/19926 [SPARK-22733] Split StreamExecution into MicroBatchExecution and StreamExecution. ## What changes were proposed in this pull request? StreamExecution is now an abstract base class, which MicroBatchExecution (the current StreamExecution) inherits. When continuous processing is implemented, we'll have a new ContinuousExecution implementation of StreamExecution. A few fields are also renamed to make them less microbatch-specific. ## How was this patch tested? refactoring only You can merge this pull request into a Git repository by running: $ git pull https://github.com/joseph-torres/spark continuous-refactor Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19926.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19926 commit 22d93b7d6133bffb271e6db300b936ae4dda74ab Author: Jose Torres <j...@databricks.com> Date: 2017-12-07T22:08:28Z Refactor StreamExecution into a parent class so continuous processing can extend it --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19925: [SPARK-22732] Add Structured Streaming APIs to DataSourc...
Github user joseph-torres commented on the issue: https://github.com/apache/spark/pull/19925 /cc @marmbrus @cloud-fan @rxin @brkyvz @zsxwing --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19926: [SPARK-22733] Split StreamExecution into MicroBatchExecu...
Github user joseph-torres commented on the issue: https://github.com/apache/spark/pull/19926 /cc @brkyvz @zsxwing --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19925: [SPARK-22732] Add Structured Streaming APIs to DataSourc...
Github user joseph-torres commented on the issue: https://github.com/apache/spark/pull/19925 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19925: [SPARK-22732] Add Structured Streaming APIs to Da...
Github user joseph-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/19925#discussion_r155867163 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/MicroBatchWriteSupport.java --- @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2; + +import java.util.Optional; + +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.execution.streaming.BaseStreamingSink; +import org.apache.spark.sql.sources.v2.writer.DataSourceV2Writer; +import org.apache.spark.sql.streaming.OutputMode; +import org.apache.spark.sql.types.StructType; + +/** + * A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to + * provide data writing ability and save the data from a microbatch to the data source. + */ +@InterfaceStability.Evolving +public interface MicroBatchWriteSupport extends BaseStreamingSink { + + /** + * Creates an optional {@link DataSourceV2Writer} to save the data to this data source. Data + * sources can return None if there is no writing needed to be done. + * + * @param queryId A unique string for the writing query. It's possible that there are many writing + *queries running at the same time, and the returned {@link DataSourceV2Writer} + *can use this id to distinguish itself from others. + * @param epochId The uniquenumeric ID of the batch within this writing query. This is an + *incrementing counter representing a consistent set of data; the same batch may + *be started multiple times in failure recovery scenarios, but it will always + *contain the same records. + * @param schema the schema of the data to be written. + * @param mode the output mode which determines what successive batch output means to this + * source, please refer to {@link OutputMode} for more details. --- End diff -- Good point. Fixed here and in ContinuousWriteSupport. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19611: [SPARK-22305] Write HDFSBackedStateStoreProvider....
GitHub user joseph-torres opened a pull request: https://github.com/apache/spark/pull/19611 [SPARK-22305] Write HDFSBackedStateStoreProvider.loadMap non-recursively ## What changes were proposed in this pull request? Write HDFSBackedStateStoreProvider.loadMap non-recursively. This prevents stack overflow if too many deltas stack up in a low memory environment. ## How was this patch tested? existing unit tests for functional equivalence, new unit test to check for stack overflow You can merge this pull request into a Git repository by running: $ git pull https://github.com/joseph-torres/spark SPARK-22305 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19611.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19611 commit 6366347516853abd7afd7e89452e656b1011cf6e Author: Jose Torres <j...@databricks.com> Date: 2017-10-30T15:48:13Z rewrite loadMap iteratively commit 33ea2fb59f5ad47ed4713ca73945a9630486677c Author: Jose Torres <j...@databricks.com> Date: 2017-10-30T16:28:28Z add test exercising stack overflow --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19611: [SPARK-22305] Write HDFSBackedStateStoreProvider.loadMap...
Github user joseph-torres commented on the issue: https://github.com/apache/spark/pull/19611 One issue I want to explicitly bring up: this new unit test takes very long, almost 2 minutes on my computer. Creating 10k files isn't going to be super fast no matter what we do, but is there something that can mitigate the problem? Maybe it'd be better in a different suite or something; I don't know what the typical practice is. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19581: [SPARK-22366] Support ignoring missing files
GitHub user joseph-torres opened a pull request: https://github.com/apache/spark/pull/19581 [SPARK-22366] Support ignoring missing files ## What changes were proposed in this pull request? Add a flag "spark.sql.files.ignoreMissingFiles" to parallel the existing flag "spark.sql.files.ignoreCorruptFiles". ## How was this patch tested? new unit test You can merge this pull request into a Git repository by running: $ git pull https://github.com/joseph-torres/spark SPARK-22366 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19581.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19581 commit ca059b09a561ee8295b282b7b577514fef101ef3 Author: Jose Torres <j...@databricks.com> Date: 2017-10-26T20:45:57Z Support ignoreMissingFiles --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...
Github user joseph-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/19984#discussion_r158156114 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala --- @@ -0,0 +1,343 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.util.concurrent.TimeUnit + +import scala.collection.mutable.{ArrayBuffer, Map => MutableMap} + +import org.apache.spark.SparkEnv +import org.apache.spark.sql.{AnalysisException, SparkSession} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.SQLExecution +import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, WriteToDataSourceV2} +import org.apache.spark.sql.execution.streaming.{ContinuousExecutionRelation, StreamingRelationV2, _} +import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, ContinuousWriteSupport, DataSourceV2Options} +import org.apache.spark.sql.sources.v2.reader.{ContinuousReader, Offset, PartitionOffset} +import org.apache.spark.sql.sources.v2.writer.ContinuousWriter +import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger} +import org.apache.spark.sql.types.StructType +import org.apache.spark.util.{Clock, Utils} + +class ContinuousExecution( +sparkSession: SparkSession, +name: String, +checkpointRoot: String, +analyzedPlan: LogicalPlan, +sink: ContinuousWriteSupport, +trigger: Trigger, +triggerClock: Clock, +outputMode: OutputMode, +extraOptions: Map[String, String], +deleteCheckpointOnStop: Boolean) + extends StreamExecution( +sparkSession, name, checkpointRoot, analyzedPlan, sink, +trigger, triggerClock, outputMode, deleteCheckpointOnStop) { + + @volatile protected var continuousSources: Seq[ContinuousReader] = Seq.empty + override protected def sources: Seq[BaseStreamingSource] = continuousSources + + override lazy val logicalPlan: LogicalPlan = { +assert(queryExecutionThread eq Thread.currentThread, + "logicalPlan must be initialized in StreamExecutionThread " + +s"but the current thread was ${Thread.currentThread}") +var nextSourceId = 0L +val toExecutionRelationMap = MutableMap[StreamingRelationV2, ContinuousExecutionRelation]() +analyzedPlan.transform { + case r @ StreamingRelationV2( + source: ContinuousReadSupport, _, extraReaderOptions, output, _) => +toExecutionRelationMap.getOrElseUpdate(r, { + ContinuousExecutionRelation(source, extraReaderOptions, output)(sparkSession) +}) + case StreamingRelationV2(_, sourceName, _, _, _) => +throw new AnalysisException( + s"Data source $sourceName does not support continuous processing.") +} + } + + private val triggerExecutor = trigger match { +case ContinuousTrigger(t) => ProcessingTimeExecutor(ProcessingTime(t), triggerClock) +case _ => throw new IllegalStateException(s"Unsupported type of trigger: $trigger") + } + + override protected def runActivatedStream(sparkSessionForStream: SparkSession): Unit = { +do { + try { +runContinuous(sparkSessionForStream) + } catch { +case _: Throwable if state.get().equals(RECONFIGURING) => --- End diff -- The sequencing is: - The pre-existing method stopSources() marks the ContinuousReader objects as stopped and cleans up any resources they may be holding. This doesn't affect query execution, and stopSources already swallows any non-fatal exception thrown by a stop() implementation. -
[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...
Github user joseph-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/19984#discussion_r158159270 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala --- @@ -0,0 +1,343 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.util.concurrent.TimeUnit + +import scala.collection.mutable.{ArrayBuffer, Map => MutableMap} + +import org.apache.spark.SparkEnv +import org.apache.spark.sql.{AnalysisException, SparkSession} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.SQLExecution +import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, WriteToDataSourceV2} +import org.apache.spark.sql.execution.streaming.{ContinuousExecutionRelation, StreamingRelationV2, _} +import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, ContinuousWriteSupport, DataSourceV2Options} +import org.apache.spark.sql.sources.v2.reader.{ContinuousReader, Offset, PartitionOffset} +import org.apache.spark.sql.sources.v2.writer.ContinuousWriter +import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger} +import org.apache.spark.sql.types.StructType +import org.apache.spark.util.{Clock, Utils} + +class ContinuousExecution( +sparkSession: SparkSession, +name: String, +checkpointRoot: String, +analyzedPlan: LogicalPlan, +sink: ContinuousWriteSupport, +trigger: Trigger, +triggerClock: Clock, +outputMode: OutputMode, +extraOptions: Map[String, String], +deleteCheckpointOnStop: Boolean) + extends StreamExecution( +sparkSession, name, checkpointRoot, analyzedPlan, sink, +trigger, triggerClock, outputMode, deleteCheckpointOnStop) { + + @volatile protected var continuousSources: Seq[ContinuousReader] = Seq.empty + override protected def sources: Seq[BaseStreamingSource] = continuousSources + + override lazy val logicalPlan: LogicalPlan = { +assert(queryExecutionThread eq Thread.currentThread, + "logicalPlan must be initialized in StreamExecutionThread " + +s"but the current thread was ${Thread.currentThread}") +var nextSourceId = 0L +val toExecutionRelationMap = MutableMap[StreamingRelationV2, ContinuousExecutionRelation]() +analyzedPlan.transform { + case r @ StreamingRelationV2( + source: ContinuousReadSupport, _, extraReaderOptions, output, _) => +toExecutionRelationMap.getOrElseUpdate(r, { + ContinuousExecutionRelation(source, extraReaderOptions, output)(sparkSession) +}) + case StreamingRelationV2(_, sourceName, _, _, _) => +throw new AnalysisException( + s"Data source $sourceName does not support continuous processing.") +} + } + + private val triggerExecutor = trigger match { +case ContinuousTrigger(t) => ProcessingTimeExecutor(ProcessingTime(t), triggerClock) --- End diff -- I think the coupling is correct here. ProcessingTime represents the rate of progress through the query's fenceposts, which applies here as well as it does in the microbatch case. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...
Github user joseph-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/19984#discussion_r158158855 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDDIter.scala --- @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.util.concurrent.{ArrayBlockingQueue, BlockingQueue, TimeUnit} +import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong} + +import scala.collection.JavaConverters._ + +import org.apache.spark._ +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.sql.{Row, SQLContext} +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, RowToUnsafeDataReader} +import org.apache.spark.sql.execution.streaming._ +import org.apache.spark.sql.execution.streaming.continuous._ +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.streaming.ProcessingTime +import org.apache.spark.util.{SystemClock, ThreadUtils} + +class ContinuousDataSourceRDD( +sc: SparkContext, +sqlContext: SQLContext, +@transient private val readTasks: java.util.List[ReadTask[UnsafeRow]]) + extends RDD[UnsafeRow](sc, Nil) { + + private val dataQueueSize = sqlContext.conf.continuousStreamingExecutorQueueSize + private val epochPollIntervalMs = sqlContext.conf.continuousStreamingExecutorPollIntervalMs + + override protected def getPartitions: Array[Partition] = { +readTasks.asScala.zipWithIndex.map { + case (readTask, index) => new DataSourceRDDPartition(index, readTask) +}.toArray + } + + override def compute(split: Partition, context: TaskContext): Iterator[UnsafeRow] = { +val reader = split.asInstanceOf[DataSourceRDDPartition].readTask.createDataReader() + +val runId = context.getLocalProperty(ContinuousExecution.RUN_ID_KEY) + +// This queue contains two types of messages: +// * (null, null) representing an epoch boundary. +// * (row, off) containing a data row and its corresponding PartitionOffset. +val queue = new ArrayBlockingQueue[(UnsafeRow, PartitionOffset)](dataQueueSize) + +val epochPollFailed = new AtomicBoolean(false) +val epochPollExecutor = ThreadUtils.newDaemonSingleThreadScheduledExecutor( + s"epoch-poll--${runId}--${context.partitionId()}") +val epochPollRunnable = new EpochPollRunnable(queue, context, epochPollFailed) +epochPollExecutor.scheduleWithFixedDelay( + epochPollRunnable, 0, epochPollIntervalMs, TimeUnit.MILLISECONDS) + +// Important sequencing - we must get start offset before the data reader thread begins +val startOffset = ContinuousDataSourceRDD.getBaseReader(reader).getOffset + +val dataReaderFailed = new AtomicBoolean(false) +val dataReaderThread = new DataReaderThread(reader, queue, context, dataReaderFailed) +dataReaderThread.setDaemon(true) +dataReaderThread.start() + +context.addTaskCompletionListener(_ => { + reader.close() + dataReaderThread.interrupt() + epochPollExecutor.shutdown() +}) + +val epochEndpoint = EpochCoordinatorRef.get(runId, SparkEnv.get) +new Iterator[UnsafeRow] { + private var currentRow: UnsafeRow = _ + private var currentOffset: PartitionOffset = startOffset + private var currentEpoch = + context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong + + override def hasNext(): Boolean = { +if (dataReaderFailed.get()) { + throw new SparkException("data read failed", dataReaderThread.failureReason) +} +if
[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution
Github user joseph-torres commented on the issue: https://github.com/apache/spark/pull/19984 The result says it fails Spark unit tests, but clicking through shows a count of 0. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution
Github user joseph-torres commented on the issue: https://github.com/apache/spark/pull/19984 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org