> On 2011-12-12 20:44:14, Alejandro Abdelnur wrote: > > The proposed approach seems overkilling when the following change would > > achieve the same without introducing new concurrent logic: > > > > * The CallableQueueService would define a PriorityQueue with 4 priorities > > instead of 3. > > * The queue() method would ensure there no command being queued has > > priority 4 (forbidding direct use of the the highest priority). > > * The CallableQueueService would a new protected method 'Set<Class> > > getInterruptCommandClasses()' that would return the commands that are > > 'interrupt' commands. > > * On Queuing, within the queue() command any command which class is in the > > above Set will be bumped to priority 4. > > > > This achieves exactly the same behavior with the current well tested logic. > > > > Mohamed Battisha wrote: > I do not see it overkilling. It is exactly the same behavior you proposed > but instead of using a high priority queue, we are using a per-job interrupt > map. I tried as much as possible to implement it within the same theme. [I am > mapping the poposed behavior with the current design which is almost 1:1] > - * The CallableQueueService would define a PriorityQueue with 4 > priorities instead of 3. > - Using a InterruptMap within the CallableQueueService. > > * The queue() method would ensure there no command being queued has > priority 4 (forbidding direct use of the the highest priority). > - No change here are we are not going to use a special queue. instead > while polling the data from the queue, we will check the map first > > * The CallableQueueService would a new protected method 'Set<Class> > getInterruptCommandClasses()' that would return the commands that are > 'interrupt' commands. > * The interrupt commands are stored in a local set within > CallableQueueService > > * On Queuing, within the queue() command any command which class is in > the above Set will be bumped to priority 4. > On Queuing, within the queue command, if the command type belongs to the > InterruptSet, it will inserted to Interrupt Map > > > The major advantage of the map approach over queue approach is we are a > avoiding job starvation. Specially, in large deployment in which hundreds of > thousands of commands are running from thousands of jobs. In this case a few > jobs with interrupt commands may take full control of the thread poll causing > other jobs to starve. The per-job map will avoid such a behavior > > Alejandro Abdelnur wrote: > Mohamed, > > The QueueCommandService has anti-starvation logic already, handled by the > callable.concurrency settings. This ensures that never a single command takes > over all threads. Because of this mechanism, what you point out as an issue, > it is not. > > So again, the alternate solution I'm suggesting restricts the changes to > one place (the CallableQueueService) and it uses the existing, well proven > priority and concurrency mechanism handled by the CllableQueueService. > > Thanks. > > Alejandro >
Alejandro I am assumed from your solution, the anti-starvation will not push other commands to the higher priority queue which will be only restricted for the interrupted commands If the anti-starvation will push other commands to the higher priority queue, so we are facing the same exact issue again. we are only using four queues instead of three. The changes I made in mainly at the CallableQueueService with the same exact logic you proposed [using a map instead of a higher priority queue]. During my tests, i figured out there are some commands that are not checking for null ref in GetLockKey(). I added these checks in these commands to avoid a null-ref exceptions in some rare condition which give an impression of a bigger change :) Thanks! Mohamed - Mohamed ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/3030/#review3851 ----------------------------------------------------------- On 2011-12-09 18:58:56, Mohamed Battisha wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/3030/ > ----------------------------------------------------------- > > (Updated 2011-12-09 18:58:56) > > > Review request for oozie. > > > Summary > ------- > > Oozie continues to materialize new actions after end date modification. The > main issue is related how Oozie materialized the actions based on a FIFO > priority queue. Changing the status of a bundle/coordinator job should take > higher priority than executing this job. > > The main idea is to enable the queue to handle interruptions. Mainly, once > you finished what you are working on currently, you should focus on executing > this next action. > > The issue can be illustrated as follow: > > 1. Configure a pipeline to run for 1 hour > 2. Start the pipeline > 3. After it starts materializing new actions, change the end time (in my > example - to 10 minutes after the pipeline > starts) > 4. Monitor the coordinator apps - they will continue to materialize new > actions past the end time. > > > This addresses bug OOZIE-591. > https://issues.apache.org/jira/browse/OOZIE-591 > > > Diffs > ----- > > > http://svn.apache.org/repos/asf/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/Command.java > 1209829 > > http://svn.apache.org/repos/asf/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/XCommand.java > 1209829 > > http://svn.apache.org/repos/asf/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleJobResumeXCommand.java > 1209829 > > http://svn.apache.org/repos/asf/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundlePauseXCommand.java > 1209829 > > http://svn.apache.org/repos/asf/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleRerunXCommand.java > 1209829 > > http://svn.apache.org/repos/asf/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleStartXCommand.java > 1209829 > > http://svn.apache.org/repos/asf/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleStatusUpdateXCommand.java > 1209829 > > http://svn.apache.org/repos/asf/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/bundle/BundleUnpauseXCommand.java > 1209829 > > http://svn.apache.org/repos/asf/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java > 1209829 > > http://svn.apache.org/repos/asf/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionNotificationXCommand.java > 1209829 > > http://svn.apache.org/repos/asf/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionReadyXCommand.java > 1209829 > > http://svn.apache.org/repos/asf/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionStartXCommand.java > 1209829 > > http://svn.apache.org/repos/asf/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionTimeOutXCommand.java > 1209829 > > http://svn.apache.org/repos/asf/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdateXCommand.java > 1209829 > > http://svn.apache.org/repos/asf/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java > 1209829 > > http://svn.apache.org/repos/asf/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPauseXCommand.java > 1209829 > > http://svn.apache.org/repos/asf/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordRerunXCommand.java > 1209829 > > http://svn.apache.org/repos/asf/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordResumeXCommand.java > 1209829 > > http://svn.apache.org/repos/asf/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordSuspendXCommand.java > 1209829 > > http://svn.apache.org/repos/asf/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordUnpauseXCommand.java > 1209829 > > http://svn.apache.org/repos/asf/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ResumeXCommand.java > 1209829 > > http://svn.apache.org/repos/asf/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/WfEndXCommand.java > 1209829 > > http://svn.apache.org/repos/asf/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/service/CallableQueueService.java > 1209829 > > http://svn.apache.org/repos/asf/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/service/RecoveryService.java > 1209829 > > http://svn.apache.org/repos/asf/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/util/PollablePriorityDelayQueue.java > 1209829 > > http://svn.apache.org/repos/asf/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/util/XCallable.java > 1209829 > > http://svn.apache.org/repos/asf/incubator/oozie/trunk/core/src/main/resources/oozie-default.xml > 1209829 > > http://svn.apache.org/repos/asf/incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/TestCommand.java > 1209829 > > http://svn.apache.org/repos/asf/incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionInputCheckXCommand.java > 1209829 > > http://svn.apache.org/repos/asf/incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionStartXCommand.java > 1209829 > > http://svn.apache.org/repos/asf/incubator/oozie/trunk/core/src/test/java/org/apache/oozie/service/TestCallableQueueService.java > 1209829 > > Diff: https://reviews.apache.org/r/3030/diff > > > Testing > ------- > > Regression for all the Commands in addition for specific testing for the > Interrupt Driven Map > > > Thanks, > > Mohamed > >
