[jira] [Commented] (SPARK-24036) Stateful operators in continuous processing
[ https://issues.apache.org/jira/browse/SPARK-24036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16953304#comment-16953304 ] Ladislav Jech commented on SPARK-24036: --- Hi [~joseph.torres] - any update on this work guys? > Stateful operators in continuous processing > --- > > Key: SPARK-24036 > URL: https://issues.apache.org/jira/browse/SPARK-24036 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 3.0.0 >Reporter: Jose Torres >Priority: Major > > The first iteration of continuous processing in Spark 2.3 does not work with > stateful operators. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24036) Stateful operators in continuous processing
[ https://issues.apache.org/jira/browse/SPARK-24036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16894323#comment-16894323 ] Kevin Zhang commented on SPARK-24036: - Hi [~joseph.torres], is there any update on this work? Will the new feature be included in spark 3.0? > Stateful operators in continuous processing > --- > > Key: SPARK-24036 > URL: https://issues.apache.org/jira/browse/SPARK-24036 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 3.0.0 >Reporter: Jose Torres >Priority: Major > > The first iteration of continuous processing in Spark 2.3 does not work with > stateful operators. -- This message was sent by Atlassian JIRA (v7.6.14#76016) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24036) Stateful operators in continuous processing
[ https://issues.apache.org/jira/browse/SPARK-24036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16489727#comment-16489727 ] Jose Torres commented on SPARK-24036: - That's out of scope - the shuffle reader and writer work in this Jira would still be needed on top. > Stateful operators in continuous processing > --- > > Key: SPARK-24036 > URL: https://issues.apache.org/jira/browse/SPARK-24036 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.4.0 >Reporter: Jose Torres >Priority: Major > > The first iteration of continuous processing in Spark 2.3 does not work with > stateful operators. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24036) Stateful operators in continuous processing
[ https://issues.apache.org/jira/browse/SPARK-24036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16489700#comment-16489700 ] Arun Mahadevan commented on SPARK-24036: If I understand correctly, continuous job would have a single stage with tasks running at the same time shuffling data around (making use of the "TaskInfo" to figure out the endpoints). This means we cannot re-use the existing shuffle infra since it makes sense only if there are multiple stages ? Does SPARK-24374 plan to provide the shuffle infra to move data around or is that out of scope ? > Stateful operators in continuous processing > --- > > Key: SPARK-24036 > URL: https://issues.apache.org/jira/browse/SPARK-24036 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.4.0 >Reporter: Jose Torres >Priority: Major > > The first iteration of continuous processing in Spark 2.3 does not work with > stateful operators. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24036) Stateful operators in continuous processing
[ https://issues.apache.org/jira/browse/SPARK-24036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16489648#comment-16489648 ] Jose Torres commented on SPARK-24036: - I've been notified of [https://issues.apache.org/jira/projects/SPARK/issues/SPARK-24374,] a SPIP for an API which would provide much of what we need here wrt letting tasks know where the appropriate shuffle endpoints. > Stateful operators in continuous processing > --- > > Key: SPARK-24036 > URL: https://issues.apache.org/jira/browse/SPARK-24036 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.4.0 >Reporter: Jose Torres >Priority: Major > > The first iteration of continuous processing in Spark 2.3 does not work with > stateful operators. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24036) Stateful operators in continuous processing
[ https://issues.apache.org/jira/browse/SPARK-24036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16479122#comment-16479122 ] Apache Spark commented on SPARK-24036: -- User 'xuanyuanking' has created a pull request for this issue: https://github.com/apache/spark/pull/21353 > Stateful operators in continuous processing > --- > > Key: SPARK-24036 > URL: https://issues.apache.org/jira/browse/SPARK-24036 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.4.0 >Reporter: Jose Torres >Priority: Major > > The first iteration of continuous processing in Spark 2.3 does not work with > stateful operators. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24036) Stateful operators in continuous processing
[ https://issues.apache.org/jira/browse/SPARK-24036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16470552#comment-16470552 ] Jose Torres commented on SPARK-24036: - My concern isn't that we'll have to write more code, but that changing scheduler internals expands the surface area of interactions that need to be considered. For example, can we confidently enumerate all the ways in which the scheduler assumes a Dependency defines a stage boundary? If so, can we change all of them in a way that doesn't impact non-continuous-processing code at all? We'd have to consider a lot of questions like that, and I don't see any large benefit we'd get from doing so. Glad to take a look at your preview PR. > Stateful operators in continuous processing > --- > > Key: SPARK-24036 > URL: https://issues.apache.org/jira/browse/SPARK-24036 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.4.0 >Reporter: Jose Torres >Priority: Major > > The first iteration of continuous processing in Spark 2.3 does not work with > stateful operators. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24036) Stateful operators in continuous processing
[ https://issues.apache.org/jira/browse/SPARK-24036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16470067#comment-16470067 ] Li Yuanjian commented on SPARK-24036: - I agree with the division about the kinds of tasks, that's quite clear, but maybe all of this can be maximum transparent to scheduler by reusing the ResultTask and ShuffleMapTask design, could the DAGScheduler use ContinuousShuffleMapTask to replace original ShuffleMapTask? {quote}Changing DAGScheduler to accommodate continuous processing would create significant additional complexity I don't think we can really justify. {quote} So here, in my opinion, maybe not as complex as we think? If I'm wrong please let me know. :) {quote}Whether we need to write an explicit shuffle RDD class or not would I think come down to an implementation detail of SPARK-24236. It depends on what's the cleanest way to unfold the SparkPlan tree. {quote} Yep, can't agree more. I'll arrange this part of our internal code and give a preview PR. We'll appreciate very much with your any opinions! > Stateful operators in continuous processing > --- > > Key: SPARK-24036 > URL: https://issues.apache.org/jira/browse/SPARK-24036 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.4.0 >Reporter: Jose Torres >Priority: Major > > The first iteration of continuous processing in Spark 2.3 does not work with > stateful operators. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24036) Stateful operators in continuous processing
[ https://issues.apache.org/jira/browse/SPARK-24036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16469863#comment-16469863 ] Jose Torres commented on SPARK-24036: - The way I was envisioning it, there would be four kinds of tasks when we're done: * reader-only, which has a ContinuousDataReader at the bottom and one of the new queue writers at the top * intermediate, which has one of the new queue readers at the bottom and one of the new queue writers at the top * writer-only, which has one of the new queue readers at the bottom and a DataWriter (to the remote data sink) at the top * reader-writer, which has a ContinuousDataReader at the bottom and a DataWriter at the top But each of these would be implemented as partitions of the ContinuousWriteRDD, allowing all of this to be opaque to the scheduler. Changing DAGScheduler to accommodate continuous processing would create significant additional complexity I don't think we can really justify. Whether we need to write an explicit shuffle RDD class or not would I think come down to an implementation detail of SPARK-24236. It depends on what's the cleanest way to unfold the SparkPlan tree. > Stateful operators in continuous processing > --- > > Key: SPARK-24036 > URL: https://issues.apache.org/jira/browse/SPARK-24036 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.4.0 >Reporter: Jose Torres >Priority: Major > > The first iteration of continuous processing in Spark 2.3 does not work with > stateful operators. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24036) Stateful operators in continuous processing
[ https://issues.apache.org/jira/browse/SPARK-24036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16469830#comment-16469830 ] Li Yuanjian commented on SPARK-24036: - Hi [~joseph.torres] Thanks for cc me, looks great! My doc maybe included sub-task SPARK-24237 and SPARK-23236, could you have a look about the design: [design link|https://docs.google.com/document/d/14cGJ75v9myznywtB35ytEqL9wHy9xfZRv06B6g2tUgI/edit#bookmark=id.2lfv2glj7ny0], I'll take this two Jira and discuss with you in detail. Also in our practice, a new kind of continuous shuffle map task(I mentioned this in your doc comments: [comment link|https://docs.google.com/document/d/1IL4kJoKrZWeyIhklKUJqsW-yEN7V7aL05MmM65AYOfE/edit?disco=B4X1H_E]) and shuffle rdd should be added, do you agree to add another two Jira about these? > Stateful operators in continuous processing > --- > > Key: SPARK-24036 > URL: https://issues.apache.org/jira/browse/SPARK-24036 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.4.0 >Reporter: Jose Torres >Priority: Major > > The first iteration of continuous processing in Spark 2.3 does not work with > stateful operators. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24036) Stateful operators in continuous processing
[ https://issues.apache.org/jira/browse/SPARK-24036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16469801#comment-16469801 ] Jose Torres commented on SPARK-24036: - ~[~XuanYuan] Since it seems we've reached broad consensus on the doc, I've added the relevant subtasks here. The stateful operator rewind is part of the "support single partition aggregates" PR I have out. > Stateful operators in continuous processing > --- > > Key: SPARK-24036 > URL: https://issues.apache.org/jira/browse/SPARK-24036 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.4.0 >Reporter: Jose Torres >Priority: Major > > The first iteration of continuous processing in Spark 2.3 does not work with > stateful operators. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24036) Stateful operators in continuous processing
[ https://issues.apache.org/jira/browse/SPARK-24036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16454788#comment-16454788 ] Jose Torres commented on SPARK-24036: - https://docs.google.com/document/d/1IL4kJoKrZWeyIhklKUJqsW-yEN7V7aL05MmM65AYOfE I wrote a quick doc summarizing my thoughts. TLDR is: * I think it's better to not reuse the existing shuffle infrastructure - we'll have to do more work to get good performance later, but current shuffle has very bad characteristics for what continuous processing is trying to do. In particular I doubt we'd be able to maintain millisecond-scale latency with anything like UnsafeShuffleWriter. * It's a small diff on top of a working shuffle to support exactly-once state management. I don't think the coordinator needs to worry about stateful operators; a writer will never commit if a stateful operator below it fails to checkpoint, and the stateful operator itself can rewind if it commits an epoch that ends up failing. Let me know what you two think. I'll send this out to the dev list if it looks reasonable, and then we can start thinking about how this breaks down into individual tasks. > Stateful operators in continuous processing > --- > > Key: SPARK-24036 > URL: https://issues.apache.org/jira/browse/SPARK-24036 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.4.0 >Reporter: Jose Torres >Priority: Major > > The first iteration of continuous processing in Spark 2.3 does not work with > stateful operators. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24036) Stateful operators in continuous processing
[ https://issues.apache.org/jira/browse/SPARK-24036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16453290#comment-16453290 ] Jungtaek Lim commented on SPARK-24036: -- Btw, I would like to say the idea for iterator hack and epoch RPC coordinator is awesome based on current goal: once only source offsets are stateful in a query. > Stateful operators in continuous processing > --- > > Key: SPARK-24036 > URL: https://issues.apache.org/jira/browse/SPARK-24036 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.4.0 >Reporter: Jose Torres >Priority: Major > > The first iteration of continuous processing in Spark 2.3 does not work with > stateful operators. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24036) Stateful operators in continuous processing
[ https://issues.apache.org/jira/browse/SPARK-24036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16453209#comment-16453209 ] Jungtaek Lim commented on SPARK-24036: -- Maybe better to share what I've observed from continuous mode so far. * It leverages iterator hack to make logical batch (epoch) in stream. ** While iterator works different from normal, it doesn't touch existing operators by putting assumption that all operators are chained and fit to single stage. ** With this assumption, only WriteToContinuousDataSourceExec needs to know how to deal with iterator hack. ** Above assumption requires no repartition, which most of stateful operators need to deal with. * Based on the hack, actually it doesn't put epoch marker flow through downstreams. ** To apply distributed snapshot it is mandatory, but it might require non-trivial change of existing model, since checkpoint should be handled from each stateful operator and stored in distributed manner, and coordinator should be able to check snapshots from all tasks are taken correctly. ** This would be unnecessary change for batch, and making existing model being much complicated. ** This would bring latency concerns, since each operator should stop processing while taking a snapshot. (I guess sending or storing snapshot still could be done asynchronously.) ** If there're more than one upstreams, it should arrange sequences between upstreams to take a snapshot with only proper data within epoch. So there is a huge challenge with existing model to extend continuous mode to support stateful exactly-once (not about end-to-end exactly once, since it also depends on sink), and I'd like to see the follow-up idea/design doc around continuous mode to see the direction of continuous mode: whether relying on such assumption and try to explore (may need to have more hacks/workarounds), or willing to discard assumption and redesign. Most of features are supported with micro-batch manner, so also would like to see the goal of continuous mode. Is it to cover all or most of features being supported with micro-batch? Or is the goal of continuous mode only to cover low latency use cases? > Stateful operators in continuous processing > --- > > Key: SPARK-24036 > URL: https://issues.apache.org/jira/browse/SPARK-24036 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.4.0 >Reporter: Jose Torres >Priority: Major > > The first iteration of continuous processing in Spark 2.3 does not work with > stateful operators. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24036) Stateful operators in continuous processing
[ https://issues.apache.org/jira/browse/SPARK-24036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16453058#comment-16453058 ] Arun Mahadevan commented on SPARK-24036: Hi [~joseph.torres], I am also interested to contribute to this effort if you are open to it. > Supporting single partition aggregates. I have a substantially complete > prototype of this in [https://github.com/jose-torres/spark/pull/13] - it > doesn't really involve design as much as removing a very silly hack I put in > earlier. Does it require saving the aggregate state by injecting epoch marker into the stream or it just works using the iterator approach since its involves only single partition? To extend this to support multiple partition and shuffles, shouldn't the epoch markers be injected into the stream and state save happen on receiving the markers from all the parent tasks ? > Just write RPC endpoints on both ends tossing rows around, optimizing for throughput later if needed. (I'm leaning towards this one.) So buffering of the rows between the stages and handling back-pressure needs to be considered here ? Would the existing shuffle infrastructure make it easier to handle this ? > Stateful operators in continuous processing > --- > > Key: SPARK-24036 > URL: https://issues.apache.org/jira/browse/SPARK-24036 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.4.0 >Reporter: Jose Torres >Priority: Major > > The first iteration of continuous processing in Spark 2.3 does not work with > stateful operators. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24036) Stateful operators in continuous processing
[ https://issues.apache.org/jira/browse/SPARK-24036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16452576#comment-16452576 ] Jose Torres commented on SPARK-24036: - The broader Spark community is of course always welcome to help. The work here is generally split into three components: * Supporting single partition aggregates. I have a substantially complete prototype of this in [https://github.com/jose-torres/spark/pull/13] - it doesn't really involve design as much as removing a very silly hack I put in earlier. * Extending support to make continuous queries with multiple partitions run. My experimentation suggests that this only requires making ShuffleExchangeExec not cache its RDD in continuous mode, but I haven't strongly verified this. * Making the multiple partition aggregates truly continuous. ShuffleExchangeExec will of course insert a stage boundary, which means that latency will end up being bound by the checkpoint interval. What we need to do is create a new kind of shuffle for continuous processing which is non-blocking (cc [~liweisheng]). There are two possibilities here which I haven't evaluated in detail: ** Reuse the existing shuffle infrastructure, optimizing for latency later if needed. ** Just write RPC endpoints on both ends tossing rows around, optimizing for throughput later if needed. (I'm leaning towards this one.) If you're interested in working on some of this, I can prioritize a design for that third part. > Stateful operators in continuous processing > --- > > Key: SPARK-24036 > URL: https://issues.apache.org/jira/browse/SPARK-24036 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.4.0 >Reporter: Jose Torres >Priority: Major > > The first iteration of continuous processing in Spark 2.3 does not work with > stateful operators. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24036) Stateful operators in continuous processing
[ https://issues.apache.org/jira/browse/SPARK-24036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16451653#comment-16451653 ] Jungtaek Lim commented on SPARK-24036: -- Hello, I'm quite interested to this issue since I just read the codebase in recent change of continuous mode and observed same limitations. Do you have ideas or any design docs for this? Moreover do you plan to share these tasks with Spark community? Willing to contribute on this side, but that's completely OK if you plan to drive whole tasks from your own. > Stateful operators in continuous processing > --- > > Key: SPARK-24036 > URL: https://issues.apache.org/jira/browse/SPARK-24036 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.4.0 >Reporter: Jose Torres >Priority: Major > > The first iteration of continuous processing in Spark 2.3 does not work with > stateful operators. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org