> On 2011-12-12 20:44:14, Alejandro Abdelnur wrote:
> > The proposed approach seems overkilling when the following change would
> > achieve the same without introducing new concurrent logic:
> >
> > * The CallableQueueService would define a PriorityQueue with 4 priorities
> > instead of 3.
> > * The queue() method would ensure there no command being queued has
> > priority 4 (forbidding direct use of the the highest priority).
> > * The CallableQueueService would a new protected method 'Set<Class>
> > getInterruptCommandClasses()' that would return the commands that are
> > 'interrupt' commands.
> > * On Queuing, within the queue() command any command which class is in the
> > above Set will be bumped to priority 4.
> >
> > This achieves exactly the same behavior with the current well tested logic.
> >
>
> Mohamed Battisha wrote:
> I do not see it overkilling. It is exactly the same behavior you proposed
> but instead of using a high priority queue, we are using a per-job interrupt
> map. I tried as much as possible to implement it within the same theme. [I am
> mapping the poposed behavior with the current design which is almost 1:1]
> - * The CallableQueueService would define a PriorityQueue with 4
> priorities instead of 3.
> - Using a InterruptMap within the CallableQueueService.
>
> * The queue() method would ensure there no command being queued has
> priority 4 (forbidding direct use of the the highest priority).
> - No change here are we are not going to use a special queue. instead
> while polling the data from the queue, we will check the map first
>
> * The CallableQueueService would a new protected method 'Set<Class>
> getInterruptCommandClasses()' that would return the commands that are
> 'interrupt' commands.
> * The interrupt commands are stored in a local set within
> CallableQueueService
>
> * On Queuing, within the queue() command any command which class is in
> the above Set will be bumped to priority 4.
> On Queuing, within the queue command, if the command type belongs to the
> InterruptSet, it will inserted to Interrupt Map
>
>
> The major advantage of the map approach over queue approach is we are a
> avoiding job starvation. Specially, in large deployment in which hundreds of
> thousands of commands are running from thousands of jobs. In this case a few
> jobs with interrupt commands may take full control of the thread poll causing
> other jobs to starve. The per-job map will avoid such a behavior
>
> Alejandro Abdelnur wrote:
> Mohamed,
>
> The QueueCommandService has anti-starvation logic already, handled by the
> callable.concurrency settings. This ensures that never a single command takes
> over all threads. Because of this mechanism, what you point out as an issue,
> it is not.
>
> So again, the alternate solution I'm suggesting restricts the changes to
> one place (the CallableQueueService) and it uses the existing, well proven
> priority and concurrency mechanism handled by the CllableQueueService.
>
> Thanks.
>
> Alejandro
>
>
> Mohamed Battisha wrote:
> Alejandro
> I am assumed from your solution, the anti-starvation will not push other
> commands to the higher priority queue which will be only restricted for the
> interrupted commands
> If the anti-starvation will push other commands to the higher priority
> queue, so we are facing the same exact issue again. we are only using four
> queues instead of three.
>
> The changes I made in mainly at the CallableQueueService with the same
> exact logic you proposed [using a map instead of a higher priority queue].
> During my tests, i figured out there are some commands that are not checking
> for null ref in GetLockKey(). I added these checks in these commands to avoid
> a null-ref exceptions in some rare condition which give an impression of a
> bigger change :)
>
> Thanks!
> Mohamed
>
> Alejandro Abdelnur wrote:
> Mohamed,
>
> Your are correct, then antiStarvation() method in the PriorityDelayQueue
> should be modified to stop the priority rising one priority less this means
> that line 488 should do:
>
> for (int i = 0; i < queues.length - 2; i++) {
>
> Good catch.
>
> Thxs.
I did some experiment to compare the two approached [Interrupt Map Vs. the
Hi-Pri Queue]
The goal of the experiment is to figure out how the Hi-Pri Queue reacted to a
long locking time for a few jobs.
In the experiment, I had 10 running jobs on a Thread poll of 5 threads. Out of
the 10 jobs, 3 of them have a long running commands that required the job to be
locked for 5 minutes.
At the same time, I generated three different interrupt commands for the given
three jobs.
Here what will happen based on the Hi-Pri Queue approach:
- The interrupt commands will be solely reside in the Hi-Pri Queue with no
other commands from other jobs [no command will be promoted to the Hi-Pri
queue].
- 3 of the 5 threads will be mainly allocated to the long running commands.
- The remaining 2 threads will be running on Hi-Pri queue. Mainly trying to
acquire the lock, failed and re-instered again on the Hi-Pri Queue [simply
doing no thing]
Based on this, the system will be mainly locked for 5 minutes, and the
remaining 7 jobs will be simply stalled.
As I mentioned before, the problem will be exaggerated when the number of jobs
increased and its probability and so its frequency will increase.
As well, if some of the commands faces a locking issues which erroneously did
not release the lock, it can cause the whole system to be stalled which should
be avoided [in shared queue we can not let an erroneous jobs impact the whole
queue. our design should isolate is as much as possible]
Using, the interrupt map will simply avoid such a locking scheme, isolating the
erroneous jobs while achieving the same goal of giving some commands an
immediate execution.
Again, as I mentioned before, the change for the interrupt map is pretty
simple and we should not be afraid of it. As well, I run a full test cases to
cover different scenario and things are pretty solid.
Thanks!
Mohamed
- Mohamed
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/3030/#review3851
-----------------------------------------------------------
On 2011-12-09 18:58:56, Mohamed Battisha wrote:
>
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/3030/
> -----------------------------------------------------------
>
> (Updated 2011-12-09 18:58:56)
>
>
> Review request for oozie.
>
>
> Summary
> -------
>
> Oozie continues to materialize new actions after end date modification. The
> main issue is related how Oozie materialized the actions based on a FIFO
> priority queue. Changing the status of a bundle/coordinator job should take
> higher priority than executing this job.
>
> The main idea is to enable the queue to handle interruptions. Mainly, once
> you finished what you are working on currently, you should focus on executing
> this next action.
>
> The issue can be illustrated as follow:
>
> 1. Configure a pipeline to run for 1 hour
> 2. Start the pipeline
> 3. After it starts materializing new actions, change the end time (in my
> example - to 10 minutes after the pipeline
> starts)
> 4. Monitor the coordinator apps - they will continue to materialize new
> actions past the end time.
>
>
> This addresses bug OOZIE-591.
> https://issues.apache.org/jira/browse/OOZIE-591
>
>
> Diffs
> -----
>
>
> http://svn.apache.org/repos/asf/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/Command.java
> 1209829
>
> http://svn.apache.org/repos/asf/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/XCommand.java
> 1209829
>
> http://svn.apache.org/repos/asf/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleJobResumeXCommand.java
> 1209829
>
> http://svn.apache.org/repos/asf/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundlePauseXCommand.java
> 1209829
>
> http://svn.apache.org/repos/asf/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleRerunXCommand.java
> 1209829
>
> http://svn.apache.org/repos/asf/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleStartXCommand.java
> 1209829
>
> http://svn.apache.org/repos/asf/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleStatusUpdateXCommand.java
> 1209829
>
> http://svn.apache.org/repos/asf/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleUnpauseXCommand.java
> 1209829
>
> http://svn.apache.org/repos/asf/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java
> 1209829
>
> http://svn.apache.org/repos/asf/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionNotificationXCommand.java
> 1209829
>
> http://svn.apache.org/repos/asf/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionReadyXCommand.java
> 1209829
>
> http://svn.apache.org/repos/asf/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionStartXCommand.java
> 1209829
>
> http://svn.apache.org/repos/asf/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionTimeOutXCommand.java
> 1209829
>
> http://svn.apache.org/repos/asf/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdateXCommand.java
> 1209829
>
> http://svn.apache.org/repos/asf/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
> 1209829
>
> http://svn.apache.org/repos/asf/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPauseXCommand.java
> 1209829
>
> http://svn.apache.org/repos/asf/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordRerunXCommand.java
> 1209829
>
> http://svn.apache.org/repos/asf/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordResumeXCommand.java
> 1209829
>
> http://svn.apache.org/repos/asf/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordSuspendXCommand.java
> 1209829
>
> http://svn.apache.org/repos/asf/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordUnpauseXCommand.java
> 1209829
>
> http://svn.apache.org/repos/asf/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ResumeXCommand.java
> 1209829
>
> http://svn.apache.org/repos/asf/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/WfEndXCommand.java
> 1209829
>
> http://svn.apache.org/repos/asf/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/service/CallableQueueService.java
> 1209829
>
> http://svn.apache.org/repos/asf/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/service/RecoveryService.java
> 1209829
>
> http://svn.apache.org/repos/asf/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/util/PollablePriorityDelayQueue.java
> 1209829
>
> http://svn.apache.org/repos/asf/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/util/XCallable.java
> 1209829
>
> http://svn.apache.org/repos/asf/incubator/oozie/trunk/core/src/main/resources/oozie-default.xml
> 1209829
>
> http://svn.apache.org/repos/asf/incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/TestCommand.java
> 1209829
>
> http://svn.apache.org/repos/asf/incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionInputCheckXCommand.java
> 1209829
>
> http://svn.apache.org/repos/asf/incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionStartXCommand.java
> 1209829
>
> http://svn.apache.org/repos/asf/incubator/oozie/trunk/core/src/test/java/org/apache/oozie/service/TestCallableQueueService.java
> 1209829
>
> Diff: https://reviews.apache.org/r/3030/diff
>
>
> Testing
> -------
>
> Regression for all the Commands in addition for specific testing for the
> Interrupt Driven Map
>
>
> Thanks,
>
> Mohamed
>
>