Modified: incubator/oozie/trunk/core/src/test/java/org/apache/oozie/service/TestCallableQueueService.java URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/test/java/org/apache/oozie/service/TestCallableQueueService.java?rev=1232709&r1=1232708&r2=1232709&view=diff ============================================================================== --- incubator/oozie/trunk/core/src/test/java/org/apache/oozie/service/TestCallableQueueService.java (original) +++ incubator/oozie/trunk/core/src/test/java/org/apache/oozie/service/TestCallableQueueService.java Wed Jan 18 01:09:12 2012 @@ -18,11 +18,16 @@ package org.apache.oozie.service; import java.util.Arrays; +import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import org.apache.oozie.ErrorCode; +import org.apache.oozie.command.CommandException; +import org.apache.oozie.command.XCommand; import org.apache.oozie.test.XTestCase; import org.apache.oozie.util.XCallable; @@ -100,6 +105,86 @@ public class TestCallableQueueService ex public String getKey() { return this.key; } + + @Override + public String getEntityKey() { + return null; + } + + @Override + public void setInterruptMode(boolean mode) { + } + + @Override + public boolean getInterruptMode() { + return false; + } + } + + public static class ExtendedXCommand extends XCommand<Void> { + private boolean lockRequired = true; + public String lockKey; + public long wait; + long executed; + + public ExtendedXCommand(String key, String type, int priority, int wait, String lockKey, boolean lockRequired) { + super(key, type, priority, false); + this.lockRequired = lockRequired; + this.lockKey = lockKey; + this.wait = wait; + } + + public ExtendedXCommand(String key, String type, int priority, int wait, String lockKey) { + super(key, type, priority, false); + this.lockKey = lockKey; + this.wait = wait; + } + + @Override + protected boolean isLockRequired() { + return this.lockRequired; + } + + @Override + protected boolean isReQueueRequired() { + return false; + } + + @Override + public String getEntityKey() { + return this.lockKey; + } + + @Override + protected void eagerLoadState() { + } + + @Override + protected void eagerVerifyPrecondition() throws CommandException { + } + + @Override + protected void loadState() { + } + + @Override + protected void verifyPrecondition() throws CommandException { + } + + @Override + protected Void execute() throws CommandException { + if (executed == 0) { + try { + Thread.sleep(this.wait); + } + catch (InterruptedException exp) { + throw new CommandException(ErrorCode.ETEST); + } + executed = System.currentTimeMillis(); + ; + } + return null; + } } public void testQueuing() throws Exception { @@ -217,10 +302,15 @@ public class TestCallableQueueService ex return "type"; } - @Override - public String getKey() { - return "name" + "_" + UUID.randomUUID(); - } + @Override + public String getKey() { + return "name" + "_" + UUID.randomUUID(); + } + + @Override + public String getEntityKey() { + return null; + } @Override public long getCreatedTime() { @@ -228,6 +318,15 @@ public class TestCallableQueueService ex } @Override + public void setInterruptMode(boolean mode) { + } + + @Override + public boolean getInterruptMode() { + return false; + } + + @Override public Void call() throws Exception { incr(); Thread.sleep(100); @@ -599,4 +698,230 @@ public class TestCallableQueueService ex services.destroy(); } + /** + * Testing the interrupts by introducing an interrupt command within a set + * of 10 commands and assuring it will be executed first + */ + public void testInterrupt() throws Exception { + EXEC_ORDER = new AtomicLong(); + setSystemProperty(CallableQueueService.CONF_THREADS, "1"); + setSystemProperty(CallableQueueService.CONF_CALLABLE_INTERRUPT_TYPES, "testKill"); + Services services = new Services(); + services.init(); + + CallableQueueService queueservice = services.get(CallableQueueService.class); + final ExtendedXCommand initialCallable = new ExtendedXCommand("initialKey", "initialType", 2, 200, + "initialLockKey"); + final List<ExtendedXCommand> callables = new ArrayList<ExtendedXCommand>(); + for (int i = 0; i < 10; i++) { + callables.add(new ExtendedXCommand("key" + i, "type" + i, 1, 100, "lockKey")); + } + + final ExtendedXCommand intCallable = new ExtendedXCommand("keyInt", "testKill", 0, 200, "lockKey"); + + queueservice.queue(initialCallable); + for (int i = 0; i < 10; i++) { + queueservice.queue(callables.get(i)); + } + + queueservice.queue(intCallable); + + waitFor(3000, new Predicate() { + public boolean evaluate() throws Exception { + boolean retValue = initialCallable.executed != 0 && intCallable.executed != 0; + for (ExtendedXCommand c : callables) { + retValue = retValue && c.executed != 0; + } + return retValue; + } + }); + + assertTrue(initialCallable.executed > 0); + assertTrue(intCallable.executed > 0); + assertTrue(intCallable.executed < callables.get(5).executed); + services.destroy(); + } + + /* + * Introducing an interrupt with different keys and assure it will be + * executed in order regardless of the existence of an interrupt command in + * the mix. + */ + public void testInterruptsWithDistinguishedLockKeys() throws Exception { + EXEC_ORDER = new AtomicLong(); + setSystemProperty(CallableQueueService.CONF_THREADS, "1"); + setSystemProperty(CallableQueueService.CONF_CALLABLE_INTERRUPT_TYPES, "testKill"); + Services services = new Services(); + services.init(); + + CallableQueueService queueservice = services.get(CallableQueueService.class); + + final ExtendedXCommand initialCallable = new ExtendedXCommand("initialKey", "initialType", 2, 200, + "initialLockKey"); + final List<ExtendedXCommand> callables = new ArrayList<ExtendedXCommand>(); + for (int i = 0; i < 10; i++) { + callables.add(new ExtendedXCommand("key" + i, "type" + i, 1, 100, "lockKey" + i)); + } + + final ExtendedXCommand intCallable = new ExtendedXCommand("keyInt", "testKill", 0, 100, "lockKey"); + + queueservice.queue(initialCallable); + for (int i = 0; i < 10; i++) { + queueservice.queue(callables.get(i)); + } + + queueservice.queue(intCallable); + + waitFor(6000, new Predicate() { + public boolean evaluate() throws Exception { + boolean retValue = initialCallable.executed != 0 && intCallable.executed != 0; + for (ExtendedXCommand c : callables) { + retValue = retValue && c.executed != 0; + } + return retValue; + } + }); + + assertTrue(initialCallable.executed > 0); + assertTrue(intCallable.executed > 0); + assertTrue(intCallable.executed > callables.get(5).executed); + + services.destroy(); + } + + /* + * assuring an interrupt command will be executed before a composite + * callable with the same lock key + */ + public void testInterruptsWithCompositeCallable() throws Exception { + EXEC_ORDER = new AtomicLong(); + setSystemProperty(CallableQueueService.CONF_THREADS, "1"); + setSystemProperty(CallableQueueService.CONF_CALLABLE_INTERRUPT_TYPES, "testKill"); + Services services = new Services(); + services.init(); + + CallableQueueService queueservice = services.get(CallableQueueService.class); + final ExtendedXCommand initialCallable = new ExtendedXCommand("initialKey", "initialType", 2, 200, + "initialLockKey"); + final List<ExtendedXCommand> callables = new ArrayList<ExtendedXCommand>(); + + for (int i = 0; i < 10; i++) { + callables.add(new ExtendedXCommand("key" + i, "type" + i, 1, 100, "lockKey")); + } + + final ExtendedXCommand intCallable = new ExtendedXCommand("key5", "testKill", 0, 200, "lockKey"); + + queueservice.queue(initialCallable); + queueservice.queueSerial((List<? extends XCallable<?>>) (callables), 0); + queueservice.queue(intCallable); + + waitFor(3000, new Predicate() { + public boolean evaluate() throws Exception { + boolean retValue = initialCallable.executed != 0 && intCallable.executed != 0; + for (ExtendedXCommand c : callables) { + retValue = retValue && c.executed != 0; + } + return retValue; + } + }); + + assertTrue(initialCallable.executed > 0); + assertTrue(intCallable.executed > 0); + for (ExtendedXCommand c : callables) { + assertTrue(intCallable.executed < c.executed); + } + + services.destroy(); + } + + /* + * Testing an interrupt commands inside a composite callable Assuring it is + * executed before the others + */ + public void testInterruptsInCompositeCallable() throws Exception { + EXEC_ORDER = new AtomicLong(); + setSystemProperty(CallableQueueService.CONF_THREADS, "1"); + setSystemProperty(CallableQueueService.CONF_CALLABLE_INTERRUPT_TYPES, "testKill"); + Services services = new Services(); + services.init(); + + CallableQueueService queueservice = services.get(CallableQueueService.class); + final ExtendedXCommand initialCallable = new ExtendedXCommand("initialKey", "initialType", 2, 200, + "initialLockKey"); + final List<ExtendedXCommand> callables = new ArrayList<ExtendedXCommand>(); + + for (int i = 0; i < 5; i++) { + callables.add(new ExtendedXCommand("key" + i, "type" + i, 1, 100, "lockKey")); + } + callables.add(new ExtendedXCommand("key" + 5, "testKill", 1, 100, "lockKey")); + for (int i = 6; i < 10; i++) { + callables.add(new ExtendedXCommand("key" + i, "type" + i, 1, 100, "lockKey")); + } + + queueservice.queue(initialCallable); + queueservice.queueSerial((List<? extends XCallable<?>>) (callables), 0); + + waitFor(3000, new Predicate() { + public boolean evaluate() throws Exception { + boolean retValue = initialCallable.executed != 0; + for (ExtendedXCommand c : callables) { + retValue = retValue && c.executed != 0; + } + return retValue; + } + }); + + assertTrue(initialCallable.executed > 0); + assertTrue(callables.get(1).executed > callables.get(5).executed); + + services.destroy(); + } + + /* + * Assuring the interrupts will not be inserted in the map when it reached + * the max size + */ + public void testMaxInterruptMapSize() throws Exception { + EXEC_ORDER = new AtomicLong(); + setSystemProperty(CallableQueueService.CONF_THREADS, "1"); + setSystemProperty(CallableQueueService.CONF_CALLABLE_INTERRUPT_TYPES, "testKill"); + setSystemProperty(CallableQueueService.CONF_CALLABLE_INTERRUPT_MAP_MAX_SIZE, "0"); + Services services = new Services(); + services.init(); + + CallableQueueService queueservice = services.get(CallableQueueService.class); + + final ExtendedXCommand initialCallable = new ExtendedXCommand("initialKey", "initialType", 2, 100, + "initialLockKey"); + final List<ExtendedXCommand> callables = new ArrayList<ExtendedXCommand>(); + for (int i = 0; i < 10; i++) { + callables.add(new ExtendedXCommand("key" + i, "type" + i, 1, 100, "lockKey")); + } + + final ExtendedXCommand intCallable = new ExtendedXCommand("keyInt", "testKill", 0, 100, "lockKey"); + + queueservice.queue(initialCallable); + for (int i = 0; i < 10; i++) { + queueservice.queue(callables.get(i)); + } + + queueservice.queue(intCallable); + + waitFor(5000, new Predicate() { + public boolean evaluate() throws Exception { + boolean retValue = initialCallable.executed != 0 && intCallable.executed != 0; + for (ExtendedXCommand c : callables) { + retValue = retValue && c.executed != 0; + } + return retValue; + } + }); + + assertTrue(initialCallable.executed > 0); + assertTrue(intCallable.executed > 0); + assertTrue(intCallable.executed > callables.get(5).executed); + + services.destroy(); + } + }
Modified: incubator/oozie/trunk/release-log.txt URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/release-log.txt?rev=1232709&r1=1232708&r2=1232709&view=diff ============================================================================== --- incubator/oozie/trunk/release-log.txt (original) +++ incubator/oozie/trunk/release-log.txt Wed Jan 18 01:09:12 2012 @@ -1,5 +1,6 @@ -- Oozie 3.2.0 release +OOZIE-591: Oozie continues to materialize new actions after end date modification (Mohamed Battisha vis Angelo) OOZIE-639 Hive sharelib POM must exclude hadoop-core. (tucu) OOZIE-635 ShellMain closes the STD/ERR stream while shell is processing. (tucu) OOZIE-629 Oozie server to prevent usage of dataset initial-instance earlier than the system-defined default value.(Mona via Mohammad)
