Author: kamrul
Date: Tue May  1 04:44:11 2012
New Revision: 1332521

URL: http://svn.apache.org/viewvc?rev=1332521&view=rev
Log:
OOZIE-819: Interrupt map doesn't have unique set of commands for a given 
jobid(virag via Mohammad)

Modified:
    
incubator/oozie/branches/branch-3.2/core/src/main/java/org/apache/oozie/command/XCommand.java
    
incubator/oozie/branches/branch-3.2/core/src/main/java/org/apache/oozie/service/CallableQueueService.java
    
incubator/oozie/branches/branch-3.2/core/src/main/resources/oozie-default.xml
    incubator/oozie/branches/branch-3.2/release-log.txt

Modified: 
incubator/oozie/branches/branch-3.2/core/src/main/java/org/apache/oozie/command/XCommand.java
URL: 
http://svn.apache.org/viewvc/incubator/oozie/branches/branch-3.2/core/src/main/java/org/apache/oozie/command/XCommand.java?rev=1332521&r1=1332520&r2=1332521&view=diff
==============================================================================
--- 
incubator/oozie/branches/branch-3.2/core/src/main/java/org/apache/oozie/command/XCommand.java
 (original)
+++ 
incubator/oozie/branches/branch-3.2/core/src/main/java/org/apache/oozie/command/XCommand.java
 Tue May  1 04:44:11 2012
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *      http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -33,6 +33,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -193,17 +194,16 @@ public abstract class XCommand<T> implem
      * @throws CommandException thrown i the lock could not be obtained.
      */
     private void acquireLock() throws InterruptedException, CommandException {
-               if (getEntityKey() == null) {
-                       // no lock for null entity key
-                       return;
-               }
+        if (getEntityKey() == null) {
+            // no lock for null entity key
+            return;
+        }
         lock = 
Services.get().get(MemoryLocksService.class).getWriteLock(getEntityKey(), 
getLockTimeOut());
         if (lock == null) {
             Instrumentation instrumentation = 
Services.get().get(InstrumentationService.class).get();
             instrumentation.incr(INSTRUMENTATION_GROUP, getName() + 
".lockTimeOut", 1);
             if (isReQueueRequired()) {
                 //if not acquire the lock, re-queue itself with default delay
-                resetUsed();
                 queue(this, getRequeueDelay());
                 LOG.debug("Could not get lock [{0}], timed out [{1}]ms, and 
requeue itself [{2}]", this.toString(), getLockTimeOut(), getName());
             } else {
@@ -232,16 +232,11 @@ public abstract class XCommand<T> implem
      */
     @Override
     public final T call() throws CommandException {
-        if (used.compareAndSet(true, true)) {
-            // avoid throwing needless exceptions in case of interrupts
-            if (this.inInterruptMode()) {
-                LOG.debug("Command [{0}] key [{1}]  already used", getName(), 
getEntityKey());
-                return null;
-            }
-            else {
-                throw new 
IllegalStateException(this.getClass().getSimpleName() + " already used.");
-            }
+        if (CallableQueueService.INTERRUPT_TYPES.contains(this.getType()) && 
used.get()) {
+            LOG.debug("Command [{0}] key [{1}]  already used for [{2}]", 
getName(), getEntityKey(), this.toString());
+            return null;
         }
+
         commandQueue = null;
         Instrumentation instrumentation = 
Services.get().get(InstrumentationService.class).get();
         instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".executions", 
1);
@@ -264,7 +259,13 @@ public abstract class XCommand<T> implem
                 if (lock != null) {
                     this.executeInterrupts();
                 }
+
                 if (!isLockRequired() || (lock != null) || 
this.inInterruptMode()) {
+                    if 
(CallableQueueService.INTERRUPT_TYPES.contains(this.getType())
+                            && !used.compareAndSet(false, true)) {
+                        LOG.debug("Command [{0}] key [{1}]  already executed 
for [{2}]", getName(), getEntityKey(), this.toString());
+                        return null;
+                    }
                     LOG.debug("Load state for [{0}]", getEntityKey());
                     loadState();
                     LOG = XLog.resetPrefix(LOG);
@@ -330,7 +331,7 @@ public abstract class XCommand<T> implem
     protected void executeInterrupts() {
         CallableQueueService callableQueueService = 
Services.get().get(CallableQueueService.class);
         // getting all the list of interrupts to be executed
-        List<XCallable<?>> callables = 
callableQueueService.checkInterrupts(this.getEntityKey());
+        Set<XCallable<?>> callables = 
callableQueueService.checkInterrupts(this.getEntityKey());
 
         if (callables != null) {
             // executing the list of interrupts in the given order of insertion

Modified: 
incubator/oozie/branches/branch-3.2/core/src/main/java/org/apache/oozie/service/CallableQueueService.java
URL: 
http://svn.apache.org/viewvc/incubator/oozie/branches/branch-3.2/core/src/main/java/org/apache/oozie/service/CallableQueueService.java?rev=1332521&r1=1332520&r2=1332521&view=diff
==============================================================================
--- 
incubator/oozie/branches/branch-3.2/core/src/main/java/org/apache/oozie/service/CallableQueueService.java
 (original)
+++ 
incubator/oozie/branches/branch-3.2/core/src/main/java/org/apache/oozie/service/CallableQueueService.java
 Tue May  1 04:44:11 2012
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *      http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -22,6 +22,7 @@ import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -90,9 +91,9 @@ public class CallableQueueService implem
 
     private final Map<String, Date> uniqueCallables = new 
ConcurrentHashMap<String, Date>();
 
-    private final ConcurrentHashMap<String, List<XCallable<?>>> 
interruptCommandsMap = new ConcurrentHashMap<String, List<XCallable<?>>>();
+    private final ConcurrentHashMap<String, Set<XCallable<?>>> 
interruptCommandsMap = new ConcurrentHashMap<String, Set<XCallable<?>>>();
 
-    private final HashSet<String> interruptTypes = new HashSet<String>();
+    public static final HashSet<String> INTERRUPT_TYPES = new 
HashSet<String>();
 
     private int interruptMapMaxSize;
 
@@ -446,7 +447,8 @@ public class CallableQueueService implem
         boolean callableNextEligible = 
conf.getBoolean(CONF_CALLABLE_NEXT_ELIGIBLE, true);
 
         for (String type : 
conf.getStringCollection(CONF_CALLABLE_INTERRUPT_TYPES)) {
-            interruptTypes.add(type);
+            log.debug("Adding interrupt type [{0}]", type);
+            INTERRUPT_TYPES.add(type);
         }
 
         if (!callableNextEligible) {
@@ -686,7 +688,7 @@ public class CallableQueueService implem
      * exist a List of Interrupt Callable for the same lock key will 
bereturned,
      * otherwise it will return null
      */
-    public List<XCallable<?>> checkInterrupts(String lockKey) {
+    public Set<XCallable<?>> checkInterrupts(String lockKey) {
 
         if (lockKey != null) {
             return interruptCommandsMap.remove(lockKey);
@@ -703,12 +705,12 @@ public class CallableQueueService implem
     public void checkInterruptTypes(XCallable<?> callable) {
         if ((callable instanceof CompositeCallable) && (((CompositeCallable) 
callable).getCallables() != null)) {
             for (XCallable<?> singleCallable : ((CompositeCallable) 
callable).getCallables()) {
-                if (interruptTypes.contains(singleCallable.getType())) {
+                if (INTERRUPT_TYPES.contains(singleCallable.getType())) {
                     insertCallableIntoInterruptMap(singleCallable);
                 }
             }
         }
-        else if (interruptTypes.contains(callable.getType())) {
+        else if (INTERRUPT_TYPES.contains(callable.getType())) {
             insertCallableIntoInterruptMap(callable);
         }
     }
@@ -721,14 +723,16 @@ public class CallableQueueService implem
      */
     public void insertCallableIntoInterruptMap(XCallable<?> callable) {
         if (interruptCommandsMap.size() < interruptMapMaxSize) {
-            List<XCallable<?>> newList = Collections.synchronizedList(new 
ArrayList<XCallable<?>>());
-            List<XCallable<?>> interruptList = 
interruptCommandsMap.putIfAbsent(callable.getEntityKey(), newList);
-            if (interruptList == null) {
-                interruptList = newList;
-            }
-            interruptList.add(callable);
-            log.trace("Inserting an interrupt element [{1}] to the interrupt 
map", interruptCommandsMap.size(),
-                    callable.toString());
+            Set<XCallable<?>> newSet = Collections.synchronizedSet(new 
LinkedHashSet<XCallable<?>>());
+            Set<XCallable<?>> interruptSet = 
interruptCommandsMap.putIfAbsent(callable.getEntityKey(), newSet);
+            if (interruptSet == null) {
+                interruptSet = newSet;
+            }
+            if (interruptSet.add(callable)) {
+                log.trace("Inserting an interrupt element [{0}] to the 
interrupt map", callable.toString());
+            } else {
+                log.trace("Interrupt element [{0}] already present", 
callable.toString());
+            }
         }
         else {
             log.warn(

Modified: 
incubator/oozie/branches/branch-3.2/core/src/main/resources/oozie-default.xml
URL: 
http://svn.apache.org/viewvc/incubator/oozie/branches/branch-3.2/core/src/main/resources/oozie-default.xml?rev=1332521&r1=1332520&r2=1332521&view=diff
==============================================================================
--- 
incubator/oozie/branches/branch-3.2/core/src/main/resources/oozie-default.xml 
(original)
+++ 
incubator/oozie/branches/branch-3.2/core/src/main/resources/oozie-default.xml 
Tue May  1 04:44:11 2012
@@ -291,18 +291,7 @@
 
     <property>
         <name>oozie.service.CallableQueueService.InterruptTypes</name>
-        <value>
-            kill,
-            resume,
-            suspend,
-            bundle_kill,
-            bundle_resume,
-            bundle_suspend,
-            coord_kill,
-            coord_change,
-            coord_resume,
-            coord_suspend
-        </value>
+        
<value>kill,resume,suspend,bundle_kill,bundle_resume,bundle_suspend,coord_kill,coord_change,coord_resume,coord_suspend</value>
         <description>
             Getting the types of XCommands that are considered to be of 
Interrupt type
         </description>

Modified: incubator/oozie/branches/branch-3.2/release-log.txt
URL: 
http://svn.apache.org/viewvc/incubator/oozie/branches/branch-3.2/release-log.txt?rev=1332521&r1=1332520&r2=1332521&view=diff
==============================================================================
--- incubator/oozie/branches/branch-3.2/release-log.txt (original)
+++ incubator/oozie/branches/branch-3.2/release-log.txt Tue May  1 04:44:11 2012
@@ -1,5 +1,6 @@
 -- Oozie 3.2.0 release
 
+OOZIE-819: Interrupt map doesn't have unique set of commands for a given 
jobid(virag via Mohammad)
 OOZIE-810 Modify Oozie POM to pickup doxia 9.2y from a repo where it is 
avail(tucu via Mohammad)
 OOZIE-241 EL function(s) to expose action output data as a (XML/JSON/PROP) 
blob (tucu)
 OOZIE-820 Shell action to support env-var value with = sign (Mona via Mohammad)


Reply via email to