Author: kamrul
Date: Wed Mar  7 20:48:51 2012
New Revision: 1298107

URL: http://svn.apache.org/viewvc?rev=1298107&view=rev
Log:
OOZIE-684 CoordChangeXCommand already used is thrown while executing interrupt 
commands (Mohamed via Mohammad)

Modified:
    
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/Command.java
    
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/XCommand.java
    
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/service/CallableQueueService.java
    
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/util/XCallable.java
    
incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/TestCommand.java
    
incubator/oozie/trunk/core/src/test/java/org/apache/oozie/service/TestCallableQueueService.java
    incubator/oozie/trunk/release-log.txt

Modified: 
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/Command.java
URL: 
http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/Command.java?rev=1298107&r1=1298106&r2=1298107&view=diff
==============================================================================
--- 
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/Command.java 
(original)
+++ 
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/Command.java 
Wed Mar  7 20:48:51 2012
@@ -627,7 +627,7 @@ public abstract class Command<T, S exten
      * @return the mode of execution. true if it is executed as an Interrupt,
      *         false otherwise
      */
-    public boolean getInterruptMode() {
+    public boolean inInterruptMode() {
         return false;
     }
 

Modified: 
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/XCommand.java
URL: 
http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/XCommand.java?rev=1298107&r1=1298106&r2=1298107&view=diff
==============================================================================
--- 
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/XCommand.java 
(original)
+++ 
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/XCommand.java 
Wed Mar  7 20:48:51 2012
@@ -34,6 +34,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * Base class for synchronous and asynchronous commands.
@@ -67,7 +68,7 @@ public abstract class XCommand<T> implem
     private String type;
     private long createdTime;
     private MemoryLocks.LockToken lock;
-    private boolean used = false;
+    private AtomicBoolean used = new AtomicBoolean(false);
     private boolean inInterrupt = false;
 
     private Map<Long, List<XCommand<?>>> commandQueue;
@@ -192,6 +193,10 @@ public abstract class XCommand<T> implem
      * @throws CommandException thrown i the lock could not be obtained.
      */
     private void acquireLock() throws InterruptedException, CommandException {
+               if (getEntityKey() == null) {
+                       // no lock for null entity key
+                       return;
+               }
         lock = 
Services.get().get(MemoryLocksService.class).getWriteLock(getEntityKey(), 
getLockTimeOut());
         if (lock == null) {
             Instrumentation instrumentation = 
Services.get().get(InstrumentationService.class).get();
@@ -227,10 +232,16 @@ public abstract class XCommand<T> implem
      */
     @Override
     public final T call() throws CommandException {
-        if (used) {
-            throw new IllegalStateException(this.getClass().getSimpleName() + 
" already used.");
+        if (used.compareAndSet(true, true)) {
+            // avoid throwing needless exceptions in case of interrupts
+            if (this.inInterruptMode()) {
+                LOG.debug("Command [{0}] key [{1}]  already used", getName(), 
getEntityKey());
+                return null;
+            }
+            else {
+                throw new 
IllegalStateException(this.getClass().getSimpleName() + " already used.");
+            }
         }
-        used = true;
         commandQueue = null;
         Instrumentation instrumentation = 
Services.get().get(InstrumentationService.class).get();
         instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".executions", 
1);
@@ -242,15 +253,18 @@ public abstract class XCommand<T> implem
             eagerVerifyPrecondition();
             try {
                 T ret = null;
-                if (isLockRequired() && !this.inInterrupt) {
+                if (isLockRequired() && !this.inInterruptMode()) {
                     Instrumentation.Cron acquireLockCron = new 
Instrumentation.Cron();
                     acquireLockCron.start();
                     acquireLock();
                     acquireLockCron.stop();
                     instrumentation.addCron(INSTRUMENTATION_GROUP, getName() + 
".acquireLock", acquireLockCron);
                 }
-                if (!isLockRequired() || (isLockRequired() && lock != null) || 
this.inInterrupt) {
+                // executing interrupts only in case of the lock required 
commands
+                if (lock != null) {
                     this.executeInterrupts();
+                }
+                if (!isLockRequired() || (lock != null) || 
this.inInterruptMode()) {
                     LOG.debug("Load state for [{0}]", getEntityKey());
                     loadState();
                     LOG = XLog.resetPrefix(LOG);
@@ -313,30 +327,29 @@ public abstract class XCommand<T> implem
      * Execute them if exist.
      *
      */
-    protected void executeInterrupts()
-    {
-        if(!this.inInterrupt && (this.getEntityKey() != null)) {
-            CallableQueueService callableQueueService = 
Services.get().get(CallableQueueService.class);
-            // getting all the list of interrupts to be executed
-            List <XCallable<?>> callables = 
callableQueueService.checkInterrupts(this.getEntityKey());
-
-            if (callables != null) {
-                // executing the list of interrupts in the given order of 
insertion in the list
-                for (XCallable<?> callable : callables) {
-                    LOG.trace("executing callable [{0}]", callable.getName());
-                    try {
-                        // executing the callable in interrupt mode
-                        callable.setInterruptMode(true);
-                        callable.call();
-                        LOG.trace("executed callable [{0}]", 
callable.getName());
-                    }
-                    catch (Exception ex) {
-                     LOG.warn("exception callable [{0}], {1}", 
callable.getName(), ex.getMessage(), ex);
-                    }
-                    finally {
-                        // reseting the interrupt mode to false after the 
command is executed
-                        callable.setInterruptMode(false);
-                    }
+    protected void executeInterrupts() {
+        CallableQueueService callableQueueService = 
Services.get().get(CallableQueueService.class);
+        // getting all the list of interrupts to be executed
+        List<XCallable<?>> callables = 
callableQueueService.checkInterrupts(this.getEntityKey());
+
+        if (callables != null) {
+            // executing the list of interrupts in the given order of insertion
+            // in the list
+            for (XCallable<?> callable : callables) {
+                LOG.trace("executing interrupt callable [{0}]", 
callable.getName());
+                try {
+                    // executing the callable in interrupt mode
+                    callable.setInterruptMode(true);
+                    callable.call();
+                    LOG.trace("executed interrupt callable [{0}]", 
callable.getName());
+                }
+                catch (Exception ex) {
+                    LOG.warn("exception interrupt callable [{0}], {1}", 
callable.getName(), ex.getMessage(), ex);
+                }
+                finally {
+                    // reseting the interrupt mode to false after the command 
is
+                    // executed
+                    callable.setInterruptMode(false);
                 }
             }
         }
@@ -452,7 +465,7 @@ public abstract class XCommand<T> implem
      * @param used set false to the used
      */
     public void resetUsed() {
-        this.used = false;
+        this.used.set(false);
     }
 
 
@@ -487,7 +500,7 @@ public abstract class XCommand<T> implem
      * @return the mode of execution. true if it is executed as an Interrupt,
      *         false otherwise
      */
-    public boolean getInterruptMode() {
+    public boolean inInterruptMode() {
         return this.inInterrupt;
     }
 

Modified: 
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/service/CallableQueueService.java
URL: 
http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/service/CallableQueueService.java?rev=1298107&r1=1298106&r2=1298107&view=diff
==============================================================================
--- 
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/service/CallableQueueService.java
 (original)
+++ 
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/service/CallableQueueService.java
 Wed Mar  7 20:48:51 2012
@@ -308,7 +308,7 @@ public class CallableQueueService implem
         }
 
         @Override
-        public boolean getInterruptMode() {
+        public boolean inInterruptMode() {
             return false;
         }
 

Modified: 
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/util/XCallable.java
URL: 
http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/util/XCallable.java?rev=1298107&r1=1298106&r2=1298107&view=diff
==============================================================================
--- 
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/util/XCallable.java 
(original)
+++ 
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/util/XCallable.java 
Wed Mar  7 20:48:51 2012
@@ -79,6 +79,6 @@ public interface XCallable<T> extends Ca
      * @return the mode of execution. true if it is executed as an Interrupt,
      *         false otherwise
      */
-    public boolean getInterruptMode();
+    public boolean inInterruptMode();
 
 }

Modified: 
incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/TestCommand.java
URL: 
http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/TestCommand.java?rev=1298107&r1=1298106&r2=1298107&view=diff
==============================================================================
--- 
incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/TestCommand.java
 (original)
+++ 
incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/TestCommand.java
 Wed Mar  7 20:48:51 2012
@@ -78,7 +78,7 @@ public class TestCommand extends XTestCa
         }
 
         @Override
-        public boolean getInterruptMode() {
+        public boolean inInterruptMode() {
             return false;
         }
 

Modified: 
incubator/oozie/trunk/core/src/test/java/org/apache/oozie/service/TestCallableQueueService.java
URL: 
http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/test/java/org/apache/oozie/service/TestCallableQueueService.java?rev=1298107&r1=1298106&r2=1298107&view=diff
==============================================================================
--- 
incubator/oozie/trunk/core/src/test/java/org/apache/oozie/service/TestCallableQueueService.java
 (original)
+++ 
incubator/oozie/trunk/core/src/test/java/org/apache/oozie/service/TestCallableQueueService.java
 Wed Mar  7 20:48:51 2012
@@ -116,7 +116,7 @@ public class TestCallableQueueService ex
         }
 
         @Override
-        public boolean getInterruptMode() {
+        public boolean inInterruptMode() {
             return false;
         }
     }
@@ -322,7 +322,7 @@ public class TestCallableQueueService ex
         }
 
         @Override
-        public boolean getInterruptMode() {
+        public boolean inInterruptMode() {
             return false;
         }
 

Modified: incubator/oozie/trunk/release-log.txt
URL: 
http://svn.apache.org/viewvc/incubator/oozie/trunk/release-log.txt?rev=1298107&r1=1298106&r2=1298107&view=diff
==============================================================================
--- incubator/oozie/trunk/release-log.txt (original)
+++ incubator/oozie/trunk/release-log.txt Wed Mar  7 20:48:51 2012
@@ -1,5 +1,6 @@
 -- Oozie 3.2.0 release
 
+OOZIE-684 CoordChangeXCommand already used is thrown while executing interrupt 
commands (Mohamed via Mohammad)
 OOZIE-723 Getting rid of the unused Commands classes (mohamed via tucu)
 OOZIE-719 Missing java docs for several methods on ActionXCommand.java 
(Mohamed via Mohammad)
 OOZIE-738 HadoopAccessorService configs typo/missed value (tucu)


Reply via email to