[jira] [Commented] (APEXCORE-570) Prevent upstream operators from getting too far ahead when downstream operators are slow
[ https://issues.apache.org/jira/browse/APEXCORE-570?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15873007#comment-15873007 ] ASF GitHub Bot commented on APEXCORE-570: - Github user asfgit closed the pull request at: https://github.com/apache/apex-core/pull/445 > Prevent upstream operators from getting too far ahead when downstream > operators are slow > > > Key: APEXCORE-570 > URL: https://issues.apache.org/jira/browse/APEXCORE-570 > Project: Apache Apex Core > Issue Type: Improvement > Components: Buffer Server >Reporter: Pramod Immaneni >Assignee: Pramod Immaneni > > If the downstream operators are slower than upstream operators then the > upstream operators will get ahead and the gap can continue to increase. > Provide an option to slow down or temporarily pause the upstream operators > when they get too far ahead. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (APEXCORE-570) Prevent upstream operators from getting too far ahead when downstream operators are slow
[ https://issues.apache.org/jira/browse/APEXCORE-570?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15824628#comment-15824628 ] Pramod Immaneni commented on APEXCORE-570: -- I see your point that you could disable back pressure on the bursty one and get a better throughput. > Prevent upstream operators from getting too far ahead when downstream > operators are slow > > > Key: APEXCORE-570 > URL: https://issues.apache.org/jira/browse/APEXCORE-570 > Project: Apache Apex Core > Issue Type: Improvement >Reporter: Pramod Immaneni >Assignee: Pramod Immaneni > > If the downstream operators are slower than upstream operators then the > upstream operators will get ahead and the gap can continue to increase. > Provide an option to slow down or temporarily pause the upstream operators > when they get too far ahead. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXCORE-570) Prevent upstream operators from getting too far ahead when downstream operators are slow
[ https://issues.apache.org/jira/browse/APEXCORE-570?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15824455#comment-15824455 ] Pramod Immaneni commented on APEXCORE-570: -- "Consider the case of two subscribers with same throughput but different pattern. Once with constant and capped rate, other with bursts of activity. If they are forced to read at the same rate, then both will be slower and overall throughput reduce? Granted this is not the common scenario, but something to consider." Yes it would result in a slower throughput but since the operator is controlled by a single thread I think back pressure over one stream will block the other. > Prevent upstream operators from getting too far ahead when downstream > operators are slow > > > Key: APEXCORE-570 > URL: https://issues.apache.org/jira/browse/APEXCORE-570 > Project: Apache Apex Core > Issue Type: Improvement >Reporter: Pramod Immaneni >Assignee: Pramod Immaneni > > If the downstream operators are slower than upstream operators then the > upstream operators will get ahead and the gap can continue to increase. > Provide an option to slow down or temporarily pause the upstream operators > when they get too far ahead. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXCORE-570) Prevent upstream operators from getting too far ahead when downstream operators are slow
[ https://issues.apache.org/jira/browse/APEXCORE-570?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15823435#comment-15823435 ] Thomas Weise commented on APEXCORE-570: --- Pramod, the process related discussion is now a thread on the mailing list. IMO it is an opportunity to improvement, not to take offense. Your explanation about how you arrive at your solution is very good and valuable. The problem is that it remains only in your head unless you document it, which is what I asked for yesterday. "Yes, you are correct, the slowest subscriber will slow down the publisher (unless it is parallel partition all the way through). But, this is expected isn't it with back pressure." Consider the case of two subscribers with same throughput but different pattern. Once with constant and capped rate, other with bursts of activity. If they are forced to read at the same rate, then both will be slower and overall throughput reduce? Granted this is not the common scenario, but something to consider. > Prevent upstream operators from getting too far ahead when downstream > operators are slow > > > Key: APEXCORE-570 > URL: https://issues.apache.org/jira/browse/APEXCORE-570 > Project: Apache Apex Core > Issue Type: Improvement >Reporter: Pramod Immaneni >Assignee: Pramod Immaneni > > If the downstream operators are slower than upstream operators then the > upstream operators will get ahead and the gap can continue to increase. > Provide an option to slow down or temporarily pause the upstream operators > when they get too far ahead. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXCORE-570) Prevent upstream operators from getting too far ahead when downstream operators are slow
[ https://issues.apache.org/jira/browse/APEXCORE-570?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15823292#comment-15823292 ] Pramod Immaneni commented on APEXCORE-570: -- I will go into the topic specific comments first and then address your concern about my approach with the process. "Now back to the topic: I think this is a good approach. It will avoid the fast producing operator running ahead at the speed of writing to disk indefinitely. With this we will not need to limit the spooling at all?" Correct, spooling limit is not needed. Spooling size limit as a mechanism for back pressure will not work because we do not know how much data will be generated between two commits (committed windows). Also since the checkpoint length is configurable we cannot set it to some "reasonable" high value. Hence, spooling size limit is something that would not be practical. "What about the case where you have two subscribers (and those could be different operators) where one can keep up with the rate at which data is published and the other one may be slow, albeit maybe temporarily? This will slow down the fast subscriber and introduce latency." Yes, you are correct, the slowest subscriber will slow down the publisher (unless it is parallel partition all the way through). But, this is expected isn't it with back pressure. "Let’s first address the process issue (it may warrant a separate discussion and additions to the contributor guidelines also). If you think there was a conclusion then this may indicate that there was offline discussion that isn’t captured here or anywhere else. Just by looking at this ticket it is everything but clear what lead to your PR. This is not how the community can work, discussion has to be in the open." I think you have misunderstood my approach. When I created the JIRA, that started the discussion, I was facing a problem with a production application and had proposed using window difference as the way to create the back pressure and block publisher. Limiting spooling was suggested as an approach by both you and David and in my comment on 02nd Nov 16th at 22:53 I mentioned that it won't work because it will cause a deadlock. David's had another comment on this approach about suspending publisher on spool limit till committed which is effectively the same deadlock problem as the commit will not happen till publisher moves forward. There were no other approaches suggested so I proceeded with attempting to solve the problem via the proposed window difference approach. As I got into the weeds of the implementation and figured out all the details of how the current implementation works, I figured that instead of a window difference using the block difference was a better way to accomplish this. To me, the fundamental approach I originally suggested of blocking publisher till subscriber caught up hadn't changed rather an implementation detail. Second, the majority of the time during the implementation was spent in how to accomplish the task with the original assumption of window difference and coming to the conclusion to use blocks instead of windows and the actual coding a day or two so what you see in the PR is a relatively new discovery. In your comments, you have made a couple of statements, first that there may have been offline discussions on the implementation. This has not happened, I assure you. You are all seeing the implementation at the same time including the reviewers. The second statement is stronger about this being detrimental to community and discussions have to be open. I take personal offense to this statement. I know you want the best for the community but suggest you ascertain the truth before making such strong statements. > Prevent upstream operators from getting too far ahead when downstream > operators are slow > > > Key: APEXCORE-570 > URL: https://issues.apache.org/jira/browse/APEXCORE-570 > Project: Apache Apex Core > Issue Type: Improvement >Reporter: Pramod Immaneni >Assignee: Pramod Immaneni > > If the downstream operators are slower than upstream operators then the > upstream operators will get ahead and the gap can continue to increase. > Provide an option to slow down or temporarily pause the upstream operators > when they get too far ahead. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXCORE-570) Prevent upstream operators from getting too far ahead when downstream operators are slow
[ https://issues.apache.org/jira/browse/APEXCORE-570?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15823228#comment-15823228 ] Thomas Weise commented on APEXCORE-570: --- Let’s first address the process issue (it may warrant a separate discussion and additions to the contributor guidelines also). If you think there was a conclusion then this may indicate that there was offline discussion that isn’t captured here or anywhere else. Just by looking at this ticket it is everything but clear what lead to your PR. This is not how the community can work, discussion has to be in the open. Now back to the topic: I think this is a good approach. It will avoid the fast producing operator running ahead at the speed of writing to disk indefinitely. With this we will not need to limit the spooling at all? What about the case where you have two subscribers (and those could be different operators) where one can keep up with the rate at which data is published and the other one may be slow, albeit maybe temporarily? This will slow down the fast subscriber and introduce latency. > Prevent upstream operators from getting too far ahead when downstream > operators are slow > > > Key: APEXCORE-570 > URL: https://issues.apache.org/jira/browse/APEXCORE-570 > Project: Apache Apex Core > Issue Type: Improvement >Reporter: Pramod Immaneni >Assignee: Pramod Immaneni > > If the downstream operators are slower than upstream operators then the > upstream operators will get ahead and the gap can continue to increase. > Provide an option to slow down or temporarily pause the upstream operators > when they get too far ahead. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXCORE-570) Prevent upstream operators from getting too far ahead when downstream operators are slow
[ https://issues.apache.org/jira/browse/APEXCORE-570?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15823182#comment-15823182 ] Pramod Immaneni commented on APEXCORE-570: -- I am not sure what is left to conclude the discussion but will add this. There are two scenarios here when buffer spooling is enabled and when it is disabled. This addresses the case when buffer spooling is enabled (the default case). The buffer spooling disabled case, has some other basic issues, even before coming to back pressure, therefore requiring a different treatment and I am planning to look at that next, the issues here are captured in a separate jira APEXCORE-609. Let me describe the approach by first describing what happens today. The data in the buffer server is stored in blocks. For example, if you have a 512MB buffer (the default) it is divided into 8 blocks of 64MB each. The problem is that the as the publisher publishes more data, the blocks it is done with are spooled to disk, their data memory invalidated, regardless of where the subscribers are, and new blocks are allocated with total data memory usage remaining at 512MB (configured capacity). This is done immaterial of where the subscribers are in terms of reading those blocks. If a subscriber hits a block that is spooled that data is loaded back into memory. My approach is simply to not release blocks haphazardly when publisher is done with a block but do that only after all subscribers have read them and if there are no more blocks left for a publisher, then suspend publisher till the slowest subscriber has gone past the earliest block and released it. If you now have an app where the input operator can bring data into the dag at a much faster rate than lets say an output operator can write to a store, you will see the individual buffers in the intermediate containers build up and those operators pause as they reach the limit, with the back pressure propagating all the way back to the input operator and pausing it eventually, and as the output operator moves forward the upstream operators resume. So the buffer memory limit specified by the configurable attribute controls when the back pressure will kick in. Of course this is not an ideal way to design/run your app and you want an impedance match between your input and output or at least avg(input rate) <= avg(output rate), but in scenarios where the application is not designed to do this, it prevents a runaway happening with the upstream operators. By the way the feature can be disabled if not desired. > Prevent upstream operators from getting too far ahead when downstream > operators are slow > > > Key: APEXCORE-570 > URL: https://issues.apache.org/jira/browse/APEXCORE-570 > Project: Apache Apex Core > Issue Type: Improvement >Reporter: Pramod Immaneni >Assignee: Pramod Immaneni > > If the downstream operators are slower than upstream operators then the > upstream operators will get ahead and the gap can continue to increase. > Provide an option to slow down or temporarily pause the upstream operators > when they get too far ahead. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXCORE-570) Prevent upstream operators from getting too far ahead when downstream operators are slow
[ https://issues.apache.org/jira/browse/APEXCORE-570?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15823041#comment-15823041 ] Thomas Weise commented on APEXCORE-570: --- I think it would be useful to conclude the discussion and outline the approach before raising a pull request. > Prevent upstream operators from getting too far ahead when downstream > operators are slow > > > Key: APEXCORE-570 > URL: https://issues.apache.org/jira/browse/APEXCORE-570 > Project: Apache Apex Core > Issue Type: Improvement >Reporter: Pramod Immaneni >Assignee: Pramod Immaneni > > If the downstream operators are slower than upstream operators then the > upstream operators will get ahead and the gap can continue to increase. > Provide an option to slow down or temporarily pause the upstream operators > when they get too far ahead. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXCORE-570) Prevent upstream operators from getting too far ahead when downstream operators are slow
[ https://issues.apache.org/jira/browse/APEXCORE-570?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15649116#comment-15649116 ] Pramod Immaneni commented on APEXCORE-570: -- Also will look at how disabling buffer spooling can cause deadlocks. > Prevent upstream operators from getting too far ahead when downstream > operators are slow > > > Key: APEXCORE-570 > URL: https://issues.apache.org/jira/browse/APEXCORE-570 > Project: Apache Apex Core > Issue Type: Improvement >Reporter: Pramod Immaneni >Assignee: Pramod Immaneni > > If the downstream operators are slower than upstream operators then the > upstream operators will get ahead and the gap can continue to increase. > Provide an option to slow down or temporarily pause the upstream operators > when they get too far ahead. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXCORE-570) Prevent upstream operators from getting too far ahead when downstream operators are slow
[ https://issues.apache.org/jira/browse/APEXCORE-570?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15630848#comment-15630848 ] David Yan commented on APEXCORE-570: [~PramodSSImmaneni] Maybe we can't block it but can't we suspend the call to emitTuples and also suspend the window from being progressed until a commit happens if the spool reaches the capacity? > Prevent upstream operators from getting too far ahead when downstream > operators are slow > > > Key: APEXCORE-570 > URL: https://issues.apache.org/jira/browse/APEXCORE-570 > Project: Apache Apex Core > Issue Type: Improvement >Reporter: Pramod Immaneni >Assignee: Pramod Immaneni > > If the downstream operators are slower than upstream operators then the > upstream operators will get ahead and the gap can continue to increase. > Provide an option to slow down or temporarily pause the upstream operators > when they get too far ahead. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXCORE-570) Prevent upstream operators from getting too far ahead when downstream operators are slow
[ https://issues.apache.org/jira/browse/APEXCORE-570?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15630806#comment-15630806 ] Pramod Immaneni commented on APEXCORE-570: -- [~davidyan] that would lead to deadlock because old data cannot be deleted till committed. > Prevent upstream operators from getting too far ahead when downstream > operators are slow > > > Key: APEXCORE-570 > URL: https://issues.apache.org/jira/browse/APEXCORE-570 > Project: Apache Apex Core > Issue Type: Improvement >Reporter: Pramod Immaneni >Assignee: Pramod Immaneni > > If the downstream operators are slower than upstream operators then the > upstream operators will get ahead and the gap can continue to increase. > Provide an option to slow down or temporarily pause the upstream operators > when they get too far ahead. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXCORE-570) Prevent upstream operators from getting too far ahead when downstream operators are slow
[ https://issues.apache.org/jira/browse/APEXCORE-570?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15630749#comment-15630749 ] Munagala V. Ramanath commented on APEXCORE-570: --- I think part of the problem is that sometimes applications are written where upstream operators maintain state that will stay a reasonable size as long as data flows through the pipeline normally but if the downstream slows down, state starts to get bigger and bigger to the point where the operator is deemed hung and killed perhaps because the checkpointing is taking too long, or perhaps because it blows memory limits because spilling to disk is not happening fast enough. In such cases, this sort of throttling mechanism can be a useful coping mechanism. > Prevent upstream operators from getting too far ahead when downstream > operators are slow > > > Key: APEXCORE-570 > URL: https://issues.apache.org/jira/browse/APEXCORE-570 > Project: Apache Apex Core > Issue Type: Improvement >Reporter: Pramod Immaneni >Assignee: Pramod Immaneni > > If the downstream operators are slower than upstream operators then the > upstream operators will get ahead and the gap can continue to increase. > Provide an option to slow down or temporarily pause the upstream operators > when they get too far ahead. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXCORE-570) Prevent upstream operators from getting too far ahead when downstream operators are slow
[ https://issues.apache.org/jira/browse/APEXCORE-570?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15630694#comment-15630694 ] David Yan commented on APEXCORE-570: If the spooling capacity limit is reached, would the operator just get blocked? If so, wouldn't that solve the back pressure problem at least partially? > Prevent upstream operators from getting too far ahead when downstream > operators are slow > > > Key: APEXCORE-570 > URL: https://issues.apache.org/jira/browse/APEXCORE-570 > Project: Apache Apex Core > Issue Type: Improvement >Reporter: Pramod Immaneni >Assignee: Pramod Immaneni > > If the downstream operators are slower than upstream operators then the > upstream operators will get ahead and the gap can continue to increase. > Provide an option to slow down or temporarily pause the upstream operators > when they get too far ahead. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXCORE-570) Prevent upstream operators from getting too far ahead when downstream operators are slow
[ https://issues.apache.org/jira/browse/APEXCORE-570?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15630269#comment-15630269 ] Pramod Immaneni commented on APEXCORE-570: -- For the inter-process case, if the downstream operator is slower than the speed of spooling then it causes the upstream to pull ahead. This is the scenario I am facing and working on addressing. It cannot be controlled with an absolute capacity limit (memory+spool) because of fault tolerance older windows are needed till committed, it would have to be a window difference, how much ahead the upstream operator is allowed to go from the minimum of all downstream operators before breaks are applied on its publishing. > Prevent upstream operators from getting too far ahead when downstream > operators are slow > > > Key: APEXCORE-570 > URL: https://issues.apache.org/jira/browse/APEXCORE-570 > Project: Apache Apex Core > Issue Type: Improvement >Reporter: Pramod Immaneni >Assignee: Pramod Immaneni > > If the downstream operators are slower than upstream operators then the > upstream operators will get ahead and the gap can continue to increase. > Provide an option to slow down or temporarily pause the upstream operators > when they get too far ahead. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXCORE-570) Prevent upstream operators from getting too far ahead when downstream operators are slow
[ https://issues.apache.org/jira/browse/APEXCORE-570?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15630072#comment-15630072 ] Thomas Weise commented on APEXCORE-570: --- >From the JIRA description, this sounds like a combination of backpressure with >configurable limit on the queue. For the intra-process case the queue capacity >can be set. For buffer server, it will spill over to disk, hence the upstream >operator will produce at the speed of spooling. Are you thinking of a capacity >limit here? > Prevent upstream operators from getting too far ahead when downstream > operators are slow > > > Key: APEXCORE-570 > URL: https://issues.apache.org/jira/browse/APEXCORE-570 > Project: Apache Apex Core > Issue Type: Improvement >Reporter: Pramod Immaneni >Assignee: Pramod Immaneni > > If the downstream operators are slower than upstream operators then the > upstream operators will get ahead and the gap can continue to increase. > Provide an option to slow down or temporarily pause the upstream operators > when they get too far ahead. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXCORE-570) Prevent upstream operators from getting too far ahead when downstream operators are slow
[ https://issues.apache.org/jira/browse/APEXCORE-570?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15628460#comment-15628460 ] Pramod Immaneni commented on APEXCORE-570: -- Here is an example on how to do this from within the application https://github.com/PramodSSImmaneni/throttle In this JIRA the idea is to do this within the engine in the bufferserver itself to allow a configurable option to limit how far the upstream operator can get ahead of the downstream operator in terms of number of windows before it is blocked. Once the downstream catches up the upstream operator data will be unblocked. > Prevent upstream operators from getting too far ahead when downstream > operators are slow > > > Key: APEXCORE-570 > URL: https://issues.apache.org/jira/browse/APEXCORE-570 > Project: Apache Apex Core > Issue Type: Improvement >Reporter: Pramod Immaneni >Assignee: Pramod Immaneni > > If the downstream operators are slower than upstream operators then the > upstream operators will get ahead and the gap can continue to increase. > Provide an option to slow down or temporarily pause the upstream operators > when they get too far ahead. -- This message was sent by Atlassian JIRA (v6.3.4#6332)