Author: kamrul
Date: Tue May 1 04:44:11 2012
New Revision: 1332521
URL: http://svn.apache.org/viewvc?rev=1332521&view=rev
Log:
OOZIE-819: Interrupt map doesn't have unique set of commands for a given
jobid(virag via Mohammad)
Modified:
incubator/oozie/branches/branch-3.2/core/src/main/java/org/apache/oozie/command/XCommand.java
incubator/oozie/branches/branch-3.2/core/src/main/java/org/apache/oozie/service/CallableQueueService.java
incubator/oozie/branches/branch-3.2/core/src/main/resources/oozie-default.xml
incubator/oozie/branches/branch-3.2/release-log.txt
Modified:
incubator/oozie/branches/branch-3.2/core/src/main/java/org/apache/oozie/command/XCommand.java
URL:
http://svn.apache.org/viewvc/incubator/oozie/branches/branch-3.2/core/src/main/java/org/apache/oozie/command/XCommand.java?rev=1332521&r1=1332520&r2=1332521&view=diff
==============================================================================
---
incubator/oozie/branches/branch-3.2/core/src/main/java/org/apache/oozie/command/XCommand.java
(original)
+++
incubator/oozie/branches/branch-3.2/core/src/main/java/org/apache/oozie/command/XCommand.java
Tue May 1 04:44:11 2012
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -33,6 +33,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -193,17 +194,16 @@ public abstract class XCommand<T> implem
* @throws CommandException thrown i the lock could not be obtained.
*/
private void acquireLock() throws InterruptedException, CommandException {
- if (getEntityKey() == null) {
- // no lock for null entity key
- return;
- }
+ if (getEntityKey() == null) {
+ // no lock for null entity key
+ return;
+ }
lock =
Services.get().get(MemoryLocksService.class).getWriteLock(getEntityKey(),
getLockTimeOut());
if (lock == null) {
Instrumentation instrumentation =
Services.get().get(InstrumentationService.class).get();
instrumentation.incr(INSTRUMENTATION_GROUP, getName() +
".lockTimeOut", 1);
if (isReQueueRequired()) {
//if not acquire the lock, re-queue itself with default delay
- resetUsed();
queue(this, getRequeueDelay());
LOG.debug("Could not get lock [{0}], timed out [{1}]ms, and
requeue itself [{2}]", this.toString(), getLockTimeOut(), getName());
} else {
@@ -232,16 +232,11 @@ public abstract class XCommand<T> implem
*/
@Override
public final T call() throws CommandException {
- if (used.compareAndSet(true, true)) {
- // avoid throwing needless exceptions in case of interrupts
- if (this.inInterruptMode()) {
- LOG.debug("Command [{0}] key [{1}] already used", getName(),
getEntityKey());
- return null;
- }
- else {
- throw new
IllegalStateException(this.getClass().getSimpleName() + " already used.");
- }
+ if (CallableQueueService.INTERRUPT_TYPES.contains(this.getType()) &&
used.get()) {
+ LOG.debug("Command [{0}] key [{1}] already used for [{2}]",
getName(), getEntityKey(), this.toString());
+ return null;
}
+
commandQueue = null;
Instrumentation instrumentation =
Services.get().get(InstrumentationService.class).get();
instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".executions",
1);
@@ -264,7 +259,13 @@ public abstract class XCommand<T> implem
if (lock != null) {
this.executeInterrupts();
}
+
if (!isLockRequired() || (lock != null) ||
this.inInterruptMode()) {
+ if
(CallableQueueService.INTERRUPT_TYPES.contains(this.getType())
+ && !used.compareAndSet(false, true)) {
+ LOG.debug("Command [{0}] key [{1}] already executed
for [{2}]", getName(), getEntityKey(), this.toString());
+ return null;
+ }
LOG.debug("Load state for [{0}]", getEntityKey());
loadState();
LOG = XLog.resetPrefix(LOG);
@@ -330,7 +331,7 @@ public abstract class XCommand<T> implem
protected void executeInterrupts() {
CallableQueueService callableQueueService =
Services.get().get(CallableQueueService.class);
// getting all the list of interrupts to be executed
- List<XCallable<?>> callables =
callableQueueService.checkInterrupts(this.getEntityKey());
+ Set<XCallable<?>> callables =
callableQueueService.checkInterrupts(this.getEntityKey());
if (callables != null) {
// executing the list of interrupts in the given order of insertion
Modified:
incubator/oozie/branches/branch-3.2/core/src/main/java/org/apache/oozie/service/CallableQueueService.java
URL:
http://svn.apache.org/viewvc/incubator/oozie/branches/branch-3.2/core/src/main/java/org/apache/oozie/service/CallableQueueService.java?rev=1332521&r1=1332520&r2=1332521&view=diff
==============================================================================
---
incubator/oozie/branches/branch-3.2/core/src/main/java/org/apache/oozie/service/CallableQueueService.java
(original)
+++
incubator/oozie/branches/branch-3.2/core/src/main/java/org/apache/oozie/service/CallableQueueService.java
Tue May 1 04:44:11 2012
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -22,6 +22,7 @@ import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -90,9 +91,9 @@ public class CallableQueueService implem
private final Map<String, Date> uniqueCallables = new
ConcurrentHashMap<String, Date>();
- private final ConcurrentHashMap<String, List<XCallable<?>>>
interruptCommandsMap = new ConcurrentHashMap<String, List<XCallable<?>>>();
+ private final ConcurrentHashMap<String, Set<XCallable<?>>>
interruptCommandsMap = new ConcurrentHashMap<String, Set<XCallable<?>>>();
- private final HashSet<String> interruptTypes = new HashSet<String>();
+ public static final HashSet<String> INTERRUPT_TYPES = new
HashSet<String>();
private int interruptMapMaxSize;
@@ -446,7 +447,8 @@ public class CallableQueueService implem
boolean callableNextEligible =
conf.getBoolean(CONF_CALLABLE_NEXT_ELIGIBLE, true);
for (String type :
conf.getStringCollection(CONF_CALLABLE_INTERRUPT_TYPES)) {
- interruptTypes.add(type);
+ log.debug("Adding interrupt type [{0}]", type);
+ INTERRUPT_TYPES.add(type);
}
if (!callableNextEligible) {
@@ -686,7 +688,7 @@ public class CallableQueueService implem
* exist a List of Interrupt Callable for the same lock key will
bereturned,
* otherwise it will return null
*/
- public List<XCallable<?>> checkInterrupts(String lockKey) {
+ public Set<XCallable<?>> checkInterrupts(String lockKey) {
if (lockKey != null) {
return interruptCommandsMap.remove(lockKey);
@@ -703,12 +705,12 @@ public class CallableQueueService implem
public void checkInterruptTypes(XCallable<?> callable) {
if ((callable instanceof CompositeCallable) && (((CompositeCallable)
callable).getCallables() != null)) {
for (XCallable<?> singleCallable : ((CompositeCallable)
callable).getCallables()) {
- if (interruptTypes.contains(singleCallable.getType())) {
+ if (INTERRUPT_TYPES.contains(singleCallable.getType())) {
insertCallableIntoInterruptMap(singleCallable);
}
}
}
- else if (interruptTypes.contains(callable.getType())) {
+ else if (INTERRUPT_TYPES.contains(callable.getType())) {
insertCallableIntoInterruptMap(callable);
}
}
@@ -721,14 +723,16 @@ public class CallableQueueService implem
*/
public void insertCallableIntoInterruptMap(XCallable<?> callable) {
if (interruptCommandsMap.size() < interruptMapMaxSize) {
- List<XCallable<?>> newList = Collections.synchronizedList(new
ArrayList<XCallable<?>>());
- List<XCallable<?>> interruptList =
interruptCommandsMap.putIfAbsent(callable.getEntityKey(), newList);
- if (interruptList == null) {
- interruptList = newList;
- }
- interruptList.add(callable);
- log.trace("Inserting an interrupt element [{1}] to the interrupt
map", interruptCommandsMap.size(),
- callable.toString());
+ Set<XCallable<?>> newSet = Collections.synchronizedSet(new
LinkedHashSet<XCallable<?>>());
+ Set<XCallable<?>> interruptSet =
interruptCommandsMap.putIfAbsent(callable.getEntityKey(), newSet);
+ if (interruptSet == null) {
+ interruptSet = newSet;
+ }
+ if (interruptSet.add(callable)) {
+ log.trace("Inserting an interrupt element [{0}] to the
interrupt map", callable.toString());
+ } else {
+ log.trace("Interrupt element [{0}] already present",
callable.toString());
+ }
}
else {
log.warn(
Modified:
incubator/oozie/branches/branch-3.2/core/src/main/resources/oozie-default.xml
URL:
http://svn.apache.org/viewvc/incubator/oozie/branches/branch-3.2/core/src/main/resources/oozie-default.xml?rev=1332521&r1=1332520&r2=1332521&view=diff
==============================================================================
---
incubator/oozie/branches/branch-3.2/core/src/main/resources/oozie-default.xml
(original)
+++
incubator/oozie/branches/branch-3.2/core/src/main/resources/oozie-default.xml
Tue May 1 04:44:11 2012
@@ -291,18 +291,7 @@
<property>
<name>oozie.service.CallableQueueService.InterruptTypes</name>
- <value>
- kill,
- resume,
- suspend,
- bundle_kill,
- bundle_resume,
- bundle_suspend,
- coord_kill,
- coord_change,
- coord_resume,
- coord_suspend
- </value>
+
<value>kill,resume,suspend,bundle_kill,bundle_resume,bundle_suspend,coord_kill,coord_change,coord_resume,coord_suspend</value>
<description>
Getting the types of XCommands that are considered to be of
Interrupt type
</description>
Modified: incubator/oozie/branches/branch-3.2/release-log.txt
URL:
http://svn.apache.org/viewvc/incubator/oozie/branches/branch-3.2/release-log.txt?rev=1332521&r1=1332520&r2=1332521&view=diff
==============================================================================
--- incubator/oozie/branches/branch-3.2/release-log.txt (original)
+++ incubator/oozie/branches/branch-3.2/release-log.txt Tue May 1 04:44:11 2012
@@ -1,5 +1,6 @@
-- Oozie 3.2.0 release
+OOZIE-819: Interrupt map doesn't have unique set of commands for a given
jobid(virag via Mohammad)
OOZIE-810 Modify Oozie POM to pickup doxia 9.2y from a repo where it is
avail(tucu via Mohammad)
OOZIE-241 EL function(s) to expose action output data as a (XML/JSON/PROP)
blob (tucu)
OOZIE-820 Shell action to support env-var value with = sign (Mona via Mohammad)