> On 2011-12-12 20:44:14, Alejandro Abdelnur wrote:
> > The proposed approach seems overkilling when the following change would 
> > achieve the same without introducing new concurrent logic:
> > 
> > * The CallableQueueService would define a PriorityQueue with 4 priorities 
> > instead of 3.
> > * The queue() method would ensure there no command being queued has 
> > priority 4 (forbidding direct use of the the highest priority).
> > * The CallableQueueService would a new protected method 'Set<Class> 
> > getInterruptCommandClasses()' that would return the commands that are 
> > 'interrupt' commands.
> > * On Queuing, within the queue() command any command which class is in the 
> > above Set will be bumped to priority 4.
> > 
> > This achieves exactly the same behavior with the current well tested logic.
> >
> 
> Mohamed Battisha wrote:
>     I do not see it overkilling. It is exactly the same behavior you proposed 
> but instead of using a high priority queue, we are using a per-job interrupt 
> map. I tried as much as possible to implement it within the same theme. [I am 
> mapping the poposed behavior with the current design which is almost 1:1]
>     - * The CallableQueueService would define a PriorityQueue with 4 
> priorities instead of 3.
>     - Using a InterruptMap within the CallableQueueService.
>     
>     * The queue() method would ensure there no command being queued has 
> priority 4 (forbidding direct use of the the highest priority).
>     - No change here are we are not going to use a special queue. instead 
> while polling the data from the queue, we will check the map first
>     
>     * The CallableQueueService would a new protected method 'Set<Class> 
> getInterruptCommandClasses()' that would return the commands that are 
> 'interrupt' commands.
>     * The interrupt commands are stored in a local set within 
> CallableQueueService
>     
>     * On Queuing, within the queue() command any command which class is in 
> the above Set will be bumped to priority 4.
>     On Queuing, within the queue command, if the command type belongs to the 
> InterruptSet, it will inserted to Interrupt Map
>     
>     
>     The major advantage of the map approach over queue approach is we are a 
> avoiding job starvation. Specially, in large deployment in which hundreds of 
> thousands of commands are running from thousands of jobs. In this case a few 
> jobs with interrupt commands may take full control of the thread poll causing 
> other jobs to starve. The per-job map will avoid such a behavior
> 
> Alejandro Abdelnur wrote:
>     Mohamed, 
>     
>     The QueueCommandService has anti-starvation logic already, handled by the 
> callable.concurrency settings. This ensures that never a single command takes 
> over all threads. Because of this mechanism, what you point out as an issue, 
> it is not.
>     
>     So again, the alternate solution I'm suggesting restricts the changes to 
> one place (the CallableQueueService) and it uses the existing, well proven 
> priority and concurrency mechanism handled by the CllableQueueService.
>     
>     Thanks.
>     
>     Alejandro
>

Alejandro
I am assumed from your solution, the anti-starvation will not push other 
commands to the higher priority queue which will be only restricted for the 
interrupted commands
If the anti-starvation will push other commands to the higher priority queue, 
so we are facing the same exact issue again. we are only using four queues 
instead of three.

The changes I made in mainly at the CallableQueueService with the same exact 
logic you proposed [using a map instead of a higher priority queue]. During my 
tests, i figured out there are some commands that are not checking for null ref 
in GetLockKey(). I added these checks in these commands to avoid a null-ref 
exceptions in some rare condition which give an impression of a bigger change :)

Thanks!
Mohamed


- Mohamed


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/3030/#review3851
-----------------------------------------------------------


On 2011-12-09 18:58:56, Mohamed Battisha wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/3030/
> -----------------------------------------------------------
> 
> (Updated 2011-12-09 18:58:56)
> 
> 
> Review request for oozie.
> 
> 
> Summary
> -------
> 
> Oozie continues to materialize new actions after end date modification. The 
> main issue is related how Oozie materialized the actions based on a FIFO 
> priority queue. Changing the status of a bundle/coordinator job should take 
> higher priority than executing this job. 
> 
> The main idea is to enable the queue to handle interruptions. Mainly, once 
> you finished what you are working on currently, you should focus on executing 
> this next action. 
> 
> The issue can be illustrated as follow: 
> 
> 1. Configure a pipeline to run for 1 hour 
> 2. Start the pipeline 
> 3. After it starts materializing new actions, change the end time (in my 
> example - to 10 minutes after the pipeline 
> starts) 
> 4. Monitor the coordinator apps - they will continue to materialize new 
> actions past the end time.
> 
> 
> This addresses bug OOZIE-591.
>     https://issues.apache.org/jira/browse/OOZIE-591
> 
> 
> Diffs
> -----
> 
>   
> http://svn.apache.org/repos/asf/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/Command.java
>  1209829 
>   
> http://svn.apache.org/repos/asf/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/XCommand.java
>  1209829 
>   
> http://svn.apache.org/repos/asf/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleJobResumeXCommand.java
>  1209829 
>   
> http://svn.apache.org/repos/asf/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundlePauseXCommand.java
>  1209829 
>   
> http://svn.apache.org/repos/asf/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleRerunXCommand.java
>  1209829 
>   
> http://svn.apache.org/repos/asf/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleStartXCommand.java
>  1209829 
>   
> http://svn.apache.org/repos/asf/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleStatusUpdateXCommand.java
>  1209829 
>   
> http://svn.apache.org/repos/asf/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleUnpauseXCommand.java
>  1209829 
>   
> http://svn.apache.org/repos/asf/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java
>  1209829 
>   
> http://svn.apache.org/repos/asf/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionNotificationXCommand.java
>  1209829 
>   
> http://svn.apache.org/repos/asf/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionReadyXCommand.java
>  1209829 
>   
> http://svn.apache.org/repos/asf/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionStartXCommand.java
>  1209829 
>   
> http://svn.apache.org/repos/asf/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionTimeOutXCommand.java
>  1209829 
>   
> http://svn.apache.org/repos/asf/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdateXCommand.java
>  1209829 
>   
> http://svn.apache.org/repos/asf/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
>  1209829 
>   
> http://svn.apache.org/repos/asf/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPauseXCommand.java
>  1209829 
>   
> http://svn.apache.org/repos/asf/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordRerunXCommand.java
>  1209829 
>   
> http://svn.apache.org/repos/asf/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordResumeXCommand.java
>  1209829 
>   
> http://svn.apache.org/repos/asf/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordSuspendXCommand.java
>  1209829 
>   
> http://svn.apache.org/repos/asf/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordUnpauseXCommand.java
>  1209829 
>   
> http://svn.apache.org/repos/asf/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ResumeXCommand.java
>  1209829 
>   
> http://svn.apache.org/repos/asf/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/WfEndXCommand.java
>  1209829 
>   
> http://svn.apache.org/repos/asf/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/service/CallableQueueService.java
>  1209829 
>   
> http://svn.apache.org/repos/asf/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/service/RecoveryService.java
>  1209829 
>   
> http://svn.apache.org/repos/asf/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/util/PollablePriorityDelayQueue.java
>  1209829 
>   
> http://svn.apache.org/repos/asf/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/util/XCallable.java
>  1209829 
>   
> http://svn.apache.org/repos/asf/incubator/oozie/trunk/core/src/main/resources/oozie-default.xml
>  1209829 
>   
> http://svn.apache.org/repos/asf/incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/TestCommand.java
>  1209829 
>   
> http://svn.apache.org/repos/asf/incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionInputCheckXCommand.java
>  1209829 
>   
> http://svn.apache.org/repos/asf/incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionStartXCommand.java
>  1209829 
>   
> http://svn.apache.org/repos/asf/incubator/oozie/trunk/core/src/test/java/org/apache/oozie/service/TestCallableQueueService.java
>  1209829 
> 
> Diff: https://reviews.apache.org/r/3030/diff
> 
> 
> Testing
> -------
> 
> Regression for all the Commands in addition for specific testing for the 
> Interrupt Driven Map
> 
> 
> Thanks,
> 
> Mohamed
> 
>

Reply via email to