Author: kamrul
Date: Wed Mar 7 20:48:51 2012
New Revision: 1298107
URL: http://svn.apache.org/viewvc?rev=1298107&view=rev
Log:
OOZIE-684 CoordChangeXCommand already used is thrown while executing interrupt
commands (Mohamed via Mohammad)
Modified:
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/Command.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/XCommand.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/service/CallableQueueService.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/util/XCallable.java
incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/TestCommand.java
incubator/oozie/trunk/core/src/test/java/org/apache/oozie/service/TestCallableQueueService.java
incubator/oozie/trunk/release-log.txt
Modified:
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/Command.java
URL:
http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/Command.java?rev=1298107&r1=1298106&r2=1298107&view=diff
==============================================================================
---
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/Command.java
(original)
+++
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/Command.java
Wed Mar 7 20:48:51 2012
@@ -627,7 +627,7 @@ public abstract class Command<T, S exten
* @return the mode of execution. true if it is executed as an Interrupt,
* false otherwise
*/
- public boolean getInterruptMode() {
+ public boolean inInterruptMode() {
return false;
}
Modified:
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/XCommand.java
URL:
http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/XCommand.java?rev=1298107&r1=1298106&r2=1298107&view=diff
==============================================================================
---
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/XCommand.java
(original)
+++
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/XCommand.java
Wed Mar 7 20:48:51 2012
@@ -34,6 +34,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
/**
* Base class for synchronous and asynchronous commands.
@@ -67,7 +68,7 @@ public abstract class XCommand<T> implem
private String type;
private long createdTime;
private MemoryLocks.LockToken lock;
- private boolean used = false;
+ private AtomicBoolean used = new AtomicBoolean(false);
private boolean inInterrupt = false;
private Map<Long, List<XCommand<?>>> commandQueue;
@@ -192,6 +193,10 @@ public abstract class XCommand<T> implem
* @throws CommandException thrown i the lock could not be obtained.
*/
private void acquireLock() throws InterruptedException, CommandException {
+ if (getEntityKey() == null) {
+ // no lock for null entity key
+ return;
+ }
lock =
Services.get().get(MemoryLocksService.class).getWriteLock(getEntityKey(),
getLockTimeOut());
if (lock == null) {
Instrumentation instrumentation =
Services.get().get(InstrumentationService.class).get();
@@ -227,10 +232,16 @@ public abstract class XCommand<T> implem
*/
@Override
public final T call() throws CommandException {
- if (used) {
- throw new IllegalStateException(this.getClass().getSimpleName() +
" already used.");
+ if (used.compareAndSet(true, true)) {
+ // avoid throwing needless exceptions in case of interrupts
+ if (this.inInterruptMode()) {
+ LOG.debug("Command [{0}] key [{1}] already used", getName(),
getEntityKey());
+ return null;
+ }
+ else {
+ throw new
IllegalStateException(this.getClass().getSimpleName() + " already used.");
+ }
}
- used = true;
commandQueue = null;
Instrumentation instrumentation =
Services.get().get(InstrumentationService.class).get();
instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".executions",
1);
@@ -242,15 +253,18 @@ public abstract class XCommand<T> implem
eagerVerifyPrecondition();
try {
T ret = null;
- if (isLockRequired() && !this.inInterrupt) {
+ if (isLockRequired() && !this.inInterruptMode()) {
Instrumentation.Cron acquireLockCron = new
Instrumentation.Cron();
acquireLockCron.start();
acquireLock();
acquireLockCron.stop();
instrumentation.addCron(INSTRUMENTATION_GROUP, getName() +
".acquireLock", acquireLockCron);
}
- if (!isLockRequired() || (isLockRequired() && lock != null) ||
this.inInterrupt) {
+ // executing interrupts only in case of the lock required
commands
+ if (lock != null) {
this.executeInterrupts();
+ }
+ if (!isLockRequired() || (lock != null) ||
this.inInterruptMode()) {
LOG.debug("Load state for [{0}]", getEntityKey());
loadState();
LOG = XLog.resetPrefix(LOG);
@@ -313,30 +327,29 @@ public abstract class XCommand<T> implem
* Execute them if exist.
*
*/
- protected void executeInterrupts()
- {
- if(!this.inInterrupt && (this.getEntityKey() != null)) {
- CallableQueueService callableQueueService =
Services.get().get(CallableQueueService.class);
- // getting all the list of interrupts to be executed
- List <XCallable<?>> callables =
callableQueueService.checkInterrupts(this.getEntityKey());
-
- if (callables != null) {
- // executing the list of interrupts in the given order of
insertion in the list
- for (XCallable<?> callable : callables) {
- LOG.trace("executing callable [{0}]", callable.getName());
- try {
- // executing the callable in interrupt mode
- callable.setInterruptMode(true);
- callable.call();
- LOG.trace("executed callable [{0}]",
callable.getName());
- }
- catch (Exception ex) {
- LOG.warn("exception callable [{0}], {1}",
callable.getName(), ex.getMessage(), ex);
- }
- finally {
- // reseting the interrupt mode to false after the
command is executed
- callable.setInterruptMode(false);
- }
+ protected void executeInterrupts() {
+ CallableQueueService callableQueueService =
Services.get().get(CallableQueueService.class);
+ // getting all the list of interrupts to be executed
+ List<XCallable<?>> callables =
callableQueueService.checkInterrupts(this.getEntityKey());
+
+ if (callables != null) {
+ // executing the list of interrupts in the given order of insertion
+ // in the list
+ for (XCallable<?> callable : callables) {
+ LOG.trace("executing interrupt callable [{0}]",
callable.getName());
+ try {
+ // executing the callable in interrupt mode
+ callable.setInterruptMode(true);
+ callable.call();
+ LOG.trace("executed interrupt callable [{0}]",
callable.getName());
+ }
+ catch (Exception ex) {
+ LOG.warn("exception interrupt callable [{0}], {1}",
callable.getName(), ex.getMessage(), ex);
+ }
+ finally {
+ // reseting the interrupt mode to false after the command
is
+ // executed
+ callable.setInterruptMode(false);
}
}
}
@@ -452,7 +465,7 @@ public abstract class XCommand<T> implem
* @param used set false to the used
*/
public void resetUsed() {
- this.used = false;
+ this.used.set(false);
}
@@ -487,7 +500,7 @@ public abstract class XCommand<T> implem
* @return the mode of execution. true if it is executed as an Interrupt,
* false otherwise
*/
- public boolean getInterruptMode() {
+ public boolean inInterruptMode() {
return this.inInterrupt;
}
Modified:
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/service/CallableQueueService.java
URL:
http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/service/CallableQueueService.java?rev=1298107&r1=1298106&r2=1298107&view=diff
==============================================================================
---
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/service/CallableQueueService.java
(original)
+++
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/service/CallableQueueService.java
Wed Mar 7 20:48:51 2012
@@ -308,7 +308,7 @@ public class CallableQueueService implem
}
@Override
- public boolean getInterruptMode() {
+ public boolean inInterruptMode() {
return false;
}
Modified:
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/util/XCallable.java
URL:
http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/util/XCallable.java?rev=1298107&r1=1298106&r2=1298107&view=diff
==============================================================================
---
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/util/XCallable.java
(original)
+++
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/util/XCallable.java
Wed Mar 7 20:48:51 2012
@@ -79,6 +79,6 @@ public interface XCallable<T> extends Ca
* @return the mode of execution. true if it is executed as an Interrupt,
* false otherwise
*/
- public boolean getInterruptMode();
+ public boolean inInterruptMode();
}
Modified:
incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/TestCommand.java
URL:
http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/TestCommand.java?rev=1298107&r1=1298106&r2=1298107&view=diff
==============================================================================
---
incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/TestCommand.java
(original)
+++
incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/TestCommand.java
Wed Mar 7 20:48:51 2012
@@ -78,7 +78,7 @@ public class TestCommand extends XTestCa
}
@Override
- public boolean getInterruptMode() {
+ public boolean inInterruptMode() {
return false;
}
Modified:
incubator/oozie/trunk/core/src/test/java/org/apache/oozie/service/TestCallableQueueService.java
URL:
http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/test/java/org/apache/oozie/service/TestCallableQueueService.java?rev=1298107&r1=1298106&r2=1298107&view=diff
==============================================================================
---
incubator/oozie/trunk/core/src/test/java/org/apache/oozie/service/TestCallableQueueService.java
(original)
+++
incubator/oozie/trunk/core/src/test/java/org/apache/oozie/service/TestCallableQueueService.java
Wed Mar 7 20:48:51 2012
@@ -116,7 +116,7 @@ public class TestCallableQueueService ex
}
@Override
- public boolean getInterruptMode() {
+ public boolean inInterruptMode() {
return false;
}
}
@@ -322,7 +322,7 @@ public class TestCallableQueueService ex
}
@Override
- public boolean getInterruptMode() {
+ public boolean inInterruptMode() {
return false;
}
Modified: incubator/oozie/trunk/release-log.txt
URL:
http://svn.apache.org/viewvc/incubator/oozie/trunk/release-log.txt?rev=1298107&r1=1298106&r2=1298107&view=diff
==============================================================================
--- incubator/oozie/trunk/release-log.txt (original)
+++ incubator/oozie/trunk/release-log.txt Wed Mar 7 20:48:51 2012
@@ -1,5 +1,6 @@
-- Oozie 3.2.0 release
+OOZIE-684 CoordChangeXCommand already used is thrown while executing interrupt
commands (Mohamed via Mohammad)
OOZIE-723 Getting rid of the unused Commands classes (mohamed via tucu)
OOZIE-719 Missing java docs for several methods on ActionXCommand.java
(Mohamed via Mohammad)
OOZIE-738 HadoopAccessorService configs typo/missed value (tucu)