Author: kamrul
Date: Tue Mar 20 20:10:08 2012
New Revision: 1303127
URL: http://svn.apache.org/viewvc?rev=1303127&view=rev
Log:
OOZIE-587 Out of memory issues due to current query design.(virag via mohammad)
Modified:
incubator/oozie/branches/branch-3.1.3/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java
incubator/oozie/branches/branch-3.1.3/core/src/main/java/org/apache/oozie/command/coord/CoordActionCheckXCommand.java
incubator/oozie/branches/branch-3.1.3/core/src/main/java/org/apache/oozie/command/coord/CoordKillXCommand.java
incubator/oozie/branches/branch-3.1.3/core/src/main/java/org/apache/oozie/command/coord/CoordResumeXCommand.java
incubator/oozie/branches/branch-3.1.3/core/src/main/java/org/apache/oozie/command/coord/CoordSuspendXCommand.java
incubator/oozie/branches/branch-3.1.3/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetJPAExecutor.java
incubator/oozie/branches/branch-3.1.3/core/src/main/java/org/apache/oozie/service/StatusTransitService.java
incubator/oozie/branches/branch-3.1.3/release-log.txt
Modified:
incubator/oozie/branches/branch-3.1.3/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java
URL:
http://svn.apache.org/viewvc/incubator/oozie/branches/branch-3.1.3/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java?rev=1303127&r1=1303126&r2=1303127&view=diff
==============================================================================
---
incubator/oozie/branches/branch-3.1.3/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java
(original)
+++
incubator/oozie/branches/branch-3.1.3/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java
Tue Mar 20 20:10:08 2012
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -49,53 +49,70 @@ import org.apache.openjpa.persistence.jd
@Entity
@NamedQueries({
- @NamedQuery(name = "UPDATE_COORD_ACTION", query = "update
CoordinatorActionBean w set w.actionNumber = :actionNumber, w.actionXml =
:actionXml, w.consoleUrl = :consoleUrl, w.createdConf = :createdConf,
w.errorCode = :errorCode, w.errorMessage = :errorMessage, w.externalStatus =
:externalStatus, w.missingDependencies = :missingDependencies, w.runConf =
:runConf, w.timeOut = :timeOut, w.trackerUri = :trackerUri, w.type = :type,
w.createdTimestamp = :createdTime, w.externalId = :externalId, w.jobId =
:jobId, w.lastModifiedTimestamp = :lastModifiedTime, w.nominalTimestamp =
:nominalTime, w.slaXml = :slaXml, w.status = :status where w.id = :id"),
+ @NamedQuery(name = "UPDATE_COORD_ACTION", query = "update
CoordinatorActionBean w set w.actionNumber = :actionNumber, w.actionXml =
:actionXml, w.consoleUrl = :consoleUrl, w.createdConf = :createdConf,
w.errorCode = :errorCode, w.errorMessage = :errorMessage, w.externalStatus =
:externalStatus, w.missingDependencies = :missingDependencies, w.runConf =
:runConf, w.timeOut = :timeOut, w.trackerUri = :trackerUri, w.type = :type,
w.createdTimestamp = :createdTime, w.externalId = :externalId, w.jobId =
:jobId, w.lastModifiedTimestamp = :lastModifiedTime, w.nominalTimestamp =
:nominalTime, w.slaXml = :slaXml, w.status = :status where w.id = :id"),
- @NamedQuery(name = "UPDATE_COORD_ACTION_MIN", query = "update
CoordinatorActionBean w set w.actionXml = :actionXml, w.missingDependencies =
:missingDependencies, w.lastModifiedTimestamp = :lastModifiedTime, w.status =
:status where w.id = :id"),
+ @NamedQuery(name = "UPDATE_COORD_ACTION_MIN", query = "update
CoordinatorActionBean w set w.actionXml = :actionXml, w.missingDependencies =
:missingDependencies, w.lastModifiedTimestamp = :lastModifiedTime, w.status =
:status where w.id = :id"),
+ // Query to update the action status, pending status and last modified
time stamp of a Coordinator action
+ @NamedQuery(name = "UPDATE_COORD_ACTION_STATUS_PENDING_TIME", query =
"update CoordinatorActionBean w set w.status =:status, w.pending =:pending,
w.lastModifiedTimestamp = :lastModifiedTime where w.id = :id"),
- @NamedQuery(name = "DELETE_COMPLETED_ACTIONS_FOR_COORDINATOR", query =
"delete from CoordinatorActionBean a where a.jobId = :jobId and (a.status =
'SUCCEEDED' OR a.status = 'FAILED' OR a.status= 'KILLED')"),
+ @NamedQuery(name = "DELETE_COMPLETED_ACTIONS_FOR_COORDINATOR", query =
"delete from CoordinatorActionBean a where a.jobId = :jobId and (a.status =
'SUCCEEDED' OR a.status = 'FAILED' OR a.status= 'KILLED')"),
- @NamedQuery(name = "GET_COORD_ACTIONS", query = "select OBJECT(w) from
CoordinatorActionBean w"),
+ @NamedQuery(name = "GET_COORD_ACTIONS", query = "select OBJECT(w) from
CoordinatorActionBean w"),
- @NamedQuery(name = "GET_COMPLETED_ACTIONS_OLDER_THAN", query = "select
OBJECT(a) from CoordinatorActionBean a where a.createdTimestamp < :createdTime
and (a.status = 'SUCCEEDED' OR a.status = 'FAILED' OR a.status = 'KILLED')"),
+ @NamedQuery(name = "GET_COMPLETED_ACTIONS_OLDER_THAN", query = "select
OBJECT(a) from CoordinatorActionBean a where a.createdTimestamp < :createdTime
and (a.status = 'SUCCEEDED' OR a.status = 'FAILED' OR a.status = 'KILLED')"),
- @NamedQuery(name = "GET_COORD_ACTION", query = "select OBJECT(a) from
CoordinatorActionBean a where a.id = :id"),
+ @NamedQuery(name = "GET_COORD_ACTION", query = "select OBJECT(a) from
CoordinatorActionBean a where a.id = :id"),
- @NamedQuery(name = "GET_COORD_ACTION_FOR_EXTERNALID", query = "select
OBJECT(a) from CoordinatorActionBean a where a.externalId = :externalId"),
+ @NamedQuery(name = "GET_COORD_ACTION_FOR_EXTERNALID", query = "select
OBJECT(a) from CoordinatorActionBean a where a.externalId = :externalId"),
- @NamedQuery(name = "GET_COORD_ACTIONS_FOR_JOB_FIFO", query = "select
OBJECT(a) from CoordinatorActionBean a where a.jobId = :jobId AND a.status =
'READY' order by a.nominalTimestamp"),
+ @NamedQuery(name = "GET_COORD_ACTIONS_FOR_JOB_FIFO", query = "select
OBJECT(a) from CoordinatorActionBean a where a.jobId = :jobId AND a.status =
'READY' order by a.nominalTimestamp"),
- @NamedQuery(name = "GET_COORD_ACTIONS_FOR_JOB_LIFO", query = "select
OBJECT(a) from CoordinatorActionBean a where a.jobId = :jobId AND a.status =
'READY' order by a.nominalTimestamp desc"),
+ @NamedQuery(name = "GET_COORD_ACTIONS_FOR_JOB_LIFO", query = "select
OBJECT(a) from CoordinatorActionBean a where a.jobId = :jobId AND a.status =
'READY' order by a.nominalTimestamp desc"),
- @NamedQuery(name = "GET_COORD_RUNNING_ACTIONS_COUNT", query = "select
count(a) from CoordinatorActionBean a where a.jobId = :jobId AND (a.status =
'RUNNING' OR a.status='SUBMITTED')"),
+ @NamedQuery(name = "GET_COORD_RUNNING_ACTIONS_COUNT", query = "select
count(a) from CoordinatorActionBean a where a.jobId = :jobId AND (a.status =
'RUNNING' OR a.status='SUBMITTED')"),
- @NamedQuery(name = "GET_COORD_ACTIONS_COUNT_BY_JOBID", query = "select
count(a) from CoordinatorActionBean a where a.jobId = :jobId"),
+ @NamedQuery(name = "GET_COORD_ACTIONS_COUNT_BY_JOBID", query = "select
count(a) from CoordinatorActionBean a where a.jobId = :jobId"),
- @NamedQuery(name = "GET_COORD_ACTIVE_ACTIONS_COUNT_BY_JOBID", query =
"select count(a) from CoordinatorActionBean a where a.jobId = :jobId AND
a.status = 'WAITING'"),
+ @NamedQuery(name = "GET_COORD_ACTIVE_ACTIONS_COUNT_BY_JOBID", query =
"select count(a) from CoordinatorActionBean a where a.jobId = :jobId AND
a.status = 'WAITING'"),
- @NamedQuery(name = "GET_COORD_ACTIONS_PENDING_FALSE_COUNT", query =
"select count(a) from CoordinatorActionBean a where a.jobId = :jobId AND
a.pending = 0 AND (a.status = 'SUSPENDED' OR a.status = 'TIMEDOUT' OR a.status
= 'SUCCEEDED' OR a.status = 'KILLED' OR a.status = 'FAILED')"),
+ @NamedQuery(name = "GET_COORD_ACTIONS_PENDING_FALSE_COUNT", query =
"select count(a) from CoordinatorActionBean a where a.jobId = :jobId AND
a.pending = 0 AND (a.status = 'SUSPENDED' OR a.status = 'TIMEDOUT' OR a.status
= 'SUCCEEDED' OR a.status = 'KILLED' OR a.status = 'FAILED')"),
- @NamedQuery(name = "GET_COORD_ACTIONS_PENDING_FALSE_STATUS_COUNT", query =
"select count(a) from CoordinatorActionBean a where a.jobId = :jobId AND
a.pending = 0 AND a.status = :status"),
+ @NamedQuery(name = "GET_COORD_ACTIONS_PENDING_FALSE_STATUS_COUNT",
query = "select count(a) from CoordinatorActionBean a where a.jobId = :jobId
AND a.pending = 0 AND a.status = :status"),
- @NamedQuery(name = "GET_ACTIONS_FOR_COORD_JOB", query = "select OBJECT(a)
from CoordinatorActionBean a where a.jobId = :jobId"),
+ @NamedQuery(name = "GET_ACTIONS_FOR_COORD_JOB", query = "select
OBJECT(a) from CoordinatorActionBean a where a.jobId = :jobId"),
- @NamedQuery(name = "GET_COORD_ACTION_FOR_COORD_JOB_BY_ACTION_NUMBER",
query = "select OBJECT(a) from CoordinatorActionBean a where a.jobId = :jobId
AND a.actionNumber = :actionNumber"),
+ // Query to retrieve action id, action status, pending status and
external Id of not completed Coordinator actions
+ @NamedQuery(name = "GET_COORD_ACTIONS_NOT_COMPLETED", query = "select
a.id, a.status, a.pending, a.externalId from CoordinatorActionBean a where
a.jobId = :jobId AND a.status <> 'FAILED' AND a.status <> 'TIMEDOUT' AND
a.status <> 'SUCCEEDED' AND a.status <> 'KILLED'"),
- @NamedQuery(name = "GET_COORD_ACTIONS_BY_LAST_MODIFIED_TIME", query =
"select OBJECT(w) from CoordinatorActionBean w where w.lastModifiedTimestamp >=
:lastModifiedTime"),
+ // Query to retrieve action id, action status, pending status and
external Id of running Coordinator actions
+ @NamedQuery(name = "GET_COORD_ACTIONS_RUNNING", query = "select a.id,
a.status, a.pending, a.externalId from CoordinatorActionBean a where a.jobId =
:jobId and a.status = 'RUNNING'"),
- @NamedQuery(name = "GET_RUNNING_ACTIONS_FOR_COORD_JOB", query = "select
OBJECT(a) from CoordinatorActionBean a where a.jobId = :jobId AND a.status =
'RUNNING'"),
+ // Query to retrieve action id, action status, pending status and
external Id of suspended Coordinator actions
+ @NamedQuery(name = "GET_COORD_ACTIONS_SUSPENDED", query = "select
a.id, a.status, a.pending, a.externalId from CoordinatorActionBean a where
a.jobId = :jobId and a.status = 'SUSPENDED'"),
- @NamedQuery(name = "GET_RUNNING_ACTIONS_OLDER_THAN", query = "select
OBJECT(a) from CoordinatorActionBean a where a.status = 'RUNNING' AND
a.lastModifiedTimestamp <= :lastModifiedTime"),
+ // Query to retrieve count of Coordinator actions which are pending
+ @NamedQuery(name = "GET_COORD_ACTIONS_PENDING_COUNT", query = "select
count(a) from CoordinatorActionBean a where a.jobId = :jobId AND a.pending >
0"),
- @NamedQuery(name = "GET_COORD_ACTIONS_WAITING_SUBMITTED_OLDER_THAN", query
= "select OBJECT(a) from CoordinatorActionBean a where (a.status = 'WAITING' OR
a.status = 'SUBMITTED') AND a.lastModifiedTimestamp <= :lastModifiedTime"),
+ // Query to retrieve status of Coordinator actions which are not
pending
+ @NamedQuery(name = "GET_COORD_ACTIONS_STATUS_BY_PENDING_FALSE", query
= "select a.status from CoordinatorActionBean a where a.jobId = :jobId AND
a.pending = 0"),
- @NamedQuery(name = "GET_COORD_ACTIONS_FOR_RECOVERY_OLDER_THAN", query =
"select OBJECT(a) from CoordinatorActionBean a where a.pending > 0 AND
(a.status = 'SUSPENDED' OR a.status = 'KILLED' OR a.status = 'RUNNING') AND
a.lastModifiedTimestamp <= :lastModifiedTime"),
+ @NamedQuery(name = "GET_COORD_ACTION_FOR_COORD_JOB_BY_ACTION_NUMBER",
query = "select OBJECT(a) from CoordinatorActionBean a where a.jobId = :jobId
AND a.actionNumber = :actionNumber"),
- @NamedQuery(name = "GET_ACTIONS_FOR_DATES", query = "select OBJECT(a) from
CoordinatorActionBean a where a.jobId = :jobId AND (a.status = 'TIMEDOUT' OR
a.status = 'SUCCEEDED' OR a.status = 'KILLED' OR a.status = 'FAILED') AND
a.nominalTimestamp >= :startTime AND a.nominalTimestamp <= :endTime"),
+ @NamedQuery(name = "GET_COORD_ACTIONS_BY_LAST_MODIFIED_TIME", query =
"select OBJECT(w) from CoordinatorActionBean w where w.lastModifiedTimestamp >=
:lastModifiedTime"),
- @NamedQuery(name = "GET_ACTION_FOR_NOMINALTIME", query = "select OBJECT(a)
from CoordinatorActionBean a where a.jobId = :jobId AND a.nominalTimestamp =
:nominalTime"),
+ @NamedQuery(name = "GET_RUNNING_ACTIONS_FOR_COORD_JOB", query =
"select OBJECT(a) from CoordinatorActionBean a where a.jobId = :jobId AND
a.status = 'RUNNING'"),
- @NamedQuery(name = "GET_COORD_ACTIONS_COUNT", query = "select count(w)
from CoordinatorActionBean w")})
+ @NamedQuery(name = "GET_RUNNING_ACTIONS_OLDER_THAN", query = "select
OBJECT(a) from CoordinatorActionBean a where a.status = 'RUNNING' AND
a.lastModifiedTimestamp <= :lastModifiedTime"),
+
+ @NamedQuery(name = "GET_COORD_ACTIONS_WAITING_SUBMITTED_OLDER_THAN",
query = "select OBJECT(a) from CoordinatorActionBean a where (a.status =
'WAITING' OR a.status = 'SUBMITTED') AND a.lastModifiedTimestamp <=
:lastModifiedTime"),
+
+ @NamedQuery(name = "GET_COORD_ACTIONS_FOR_RECOVERY_OLDER_THAN", query
= "select OBJECT(a) from CoordinatorActionBean a where a.pending > 0 AND
(a.status = 'SUSPENDED' OR a.status = 'KILLED' OR a.status = 'RUNNING') AND
a.lastModifiedTimestamp <= :lastModifiedTime"),
+
+ @NamedQuery(name = "GET_ACTIONS_FOR_DATES", query = "select OBJECT(a)
from CoordinatorActionBean a where a.jobId = :jobId AND (a.status = 'TIMEDOUT'
OR a.status = 'SUCCEEDED' OR a.status = 'KILLED' OR a.status = 'FAILED') AND
a.nominalTimestamp >= :startTime AND a.nominalTimestamp <= :endTime"),
+
+ @NamedQuery(name = "GET_ACTION_FOR_NOMINALTIME", query = "select
OBJECT(a) from CoordinatorActionBean a where a.jobId = :jobId AND
a.nominalTimestamp = :nominalTime"),
+
+ @NamedQuery(name = "GET_COORD_ACTIONS_COUNT", query = "select count(w)
from CoordinatorActionBean w")})
@NamedNativeQueries({
@@ -152,8 +169,10 @@ public class CoordinatorActionBean exten
* Serialize the coordinator bean to a data output.
*
* @param dataOutput data output.
- * @throws IOException thrown if the coordinator bean could not be
serialized.
+ * @throws IOException thrown if the coordinator bean could not be
+ * serialized.
*/
+ @Override
public void write(DataOutput dataOutput) throws IOException {
WritableUtils.writeStr(dataOutput, getJobId());
WritableUtils.writeStr(dataOutput, getType());
@@ -175,8 +194,10 @@ public class CoordinatorActionBean exten
* Deserialize a coordinator bean from a data input.
*
* @param dataInput data input.
- * @throws IOException thrown if the workflow bean could not be
deserialized.
+ * @throws IOException thrown if the workflow bean could not be
+ * deserialized.
*/
+ @Override
public void readFields(DataInput dataInput) throws IOException {
setJobId(WritableUtils.readStr(dataInput));
setType(WritableUtils.readStr(dataInput));
@@ -343,7 +364,7 @@ public class CoordinatorActionBean exten
* @return pending
*/
public int decrementAndGetPending() {
- this.pending = Math.max(this.pending-1, 0);
+ this.pending = Math.max(this.pending - 1, 0);
return pending;
}
Modified:
incubator/oozie/branches/branch-3.1.3/core/src/main/java/org/apache/oozie/command/coord/CoordActionCheckXCommand.java
URL:
http://svn.apache.org/viewvc/incubator/oozie/branches/branch-3.1.3/core/src/main/java/org/apache/oozie/command/coord/CoordActionCheckXCommand.java?rev=1303127&r1=1303126&r2=1303127&view=diff
==============================================================================
---
incubator/oozie/branches/branch-3.1.3/core/src/main/java/org/apache/oozie/command/coord/CoordActionCheckXCommand.java
(original)
+++
incubator/oozie/branches/branch-3.1.3/core/src/main/java/org/apache/oozie/command/coord/CoordActionCheckXCommand.java
Tue Mar 20 20:10:08 2012
@@ -67,17 +67,23 @@ public class CoordActionCheckXCommand ex
if (wf.getStatus() == WorkflowJob.Status.SUCCEEDED) {
coordAction.setStatus(CoordinatorAction.Status.SUCCEEDED);
+ // set pending to false as the status is SUCCEEDED
+ coordAction.setPending(0);
slaStatus = Status.SUCCEEDED;
}
else {
if (wf.getStatus() == WorkflowJob.Status.FAILED) {
coordAction.setStatus(CoordinatorAction.Status.FAILED);
slaStatus = Status.FAILED;
+ // set pending to false as the status is FAILED
+ coordAction.setPending(0);
}
else {
if (wf.getStatus() == WorkflowJob.Status.KILLED) {
coordAction.setStatus(CoordinatorAction.Status.KILLED);
slaStatus = Status.KILLED;
+ // set pending to false as the status is KILLED
+ coordAction.setPending(0);
}
else {
LOG.warn("Unexpected workflow " + wf.getId() + "
STATUS " + wf.getStatus());
Modified:
incubator/oozie/branches/branch-3.1.3/core/src/main/java/org/apache/oozie/command/coord/CoordKillXCommand.java
URL:
http://svn.apache.org/viewvc/incubator/oozie/branches/branch-3.1.3/core/src/main/java/org/apache/oozie/command/coord/CoordKillXCommand.java?rev=1303127&r1=1303126&r2=1303127&view=diff
==============================================================================
---
incubator/oozie/branches/branch-3.1.3/core/src/main/java/org/apache/oozie/command/coord/CoordKillXCommand.java
(original)
+++
incubator/oozie/branches/branch-3.1.3/core/src/main/java/org/apache/oozie/command/coord/CoordKillXCommand.java
Tue Mar 20 20:10:08 2012
@@ -28,8 +28,8 @@ import org.apache.oozie.command.wf.KillX
import org.apache.oozie.command.CommandException;
import org.apache.oozie.command.KillTransitionXCommand;
import org.apache.oozie.command.PreconditionException;
-import org.apache.oozie.executor.jpa.CoordActionUpdateJPAExecutor;
-import org.apache.oozie.executor.jpa.CoordJobGetActionsJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordActionUpdateStatusJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordJobGetActionsNotCompletedJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobUpdateJPAExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
@@ -72,7 +72,8 @@ public class CoordKillXCommand extends K
if (jpaService != null) {
this.coordJob = jpaService.execute(new
CoordJobGetJPAExecutor(jobId));
- this.actionList = jpaService.execute(new
CoordJobGetActionsJPAExecutor(jobId));
+ //Get actions which are not succeeded, failed, timed out or
killed
+ this.actionList = jpaService.execute(new
CoordJobGetActionsNotCompletedJPAExecutor(jobId));
prevStatus = coordJob.getStatus();
LogUtils.setLogInfo(coordJob, logInfo);
}
@@ -105,7 +106,7 @@ public class CoordKillXCommand extends K
action.incrementAndGetPending();
action.setLastModifiedTime(new Date());
try {
- jpaService.execute(new CoordActionUpdateJPAExecutor(action));
+ jpaService.execute(new CoordActionUpdateStatusJPAExecutor(action));
}
catch (JPAExecutorException e) {
throw new CommandException(e);
@@ -117,25 +118,22 @@ public class CoordKillXCommand extends K
try {
if (actionList != null) {
for (CoordinatorActionBean action : actionList) {
- if (action.getStatus() !=
CoordinatorActionBean.Status.FAILED
- && action.getStatus() !=
CoordinatorActionBean.Status.TIMEDOUT
- && action.getStatus() !=
CoordinatorActionBean.Status.SUCCEEDED
- && action.getStatus() !=
CoordinatorActionBean.Status.KILLED) {
- // queue a WorkflowKillXCommand to delete the workflow
job and actions
- if (action.getExternalId() != null) {
- queue(new KillXCommand(action.getExternalId()));
- updateCoordAction(action);
- LOG.debug("Killed coord action = [{0}], new status
= [{1}], pending = [{2}] and queue KillXCommand for [{3}]",
- action.getId(),
action.getStatus(), action.getPending(), action.getExternalId());
- }
- else {
- updateCoordAction(action);
- LOG.debug("Killed coord action = [{0}], current
status = [{1}], pending = [{2}]", action.getId(), action
- .getStatus(), action.getPending());
- }
+ // queue a WorkflowKillXCommand to delete the workflow job
and actions
+ if (action.getExternalId() != null) {
+ queue(new KillXCommand(action.getExternalId()));
+ updateCoordAction(action);
+ LOG.debug(
+ "Killed coord action = [{0}], new status =
[{1}], pending = [{2}] and queue KillXCommand for [{3}]",
+ action.getId(), action.getStatus(),
action.getPending(), action.getExternalId());
+ }
+ else {
+ updateCoordAction(action);
+ LOG.debug("Killed coord action = [{0}], current status
= [{1}], pending = [{2}]",
+ action.getId(), action.getStatus(),
action.getPending());
}
}
}
+
jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob));
LOG.debug("Killed coord actions for the coordinator=[{0}]", jobId);
Modified:
incubator/oozie/branches/branch-3.1.3/core/src/main/java/org/apache/oozie/command/coord/CoordResumeXCommand.java
URL:
http://svn.apache.org/viewvc/incubator/oozie/branches/branch-3.1.3/core/src/main/java/org/apache/oozie/command/coord/CoordResumeXCommand.java?rev=1303127&r1=1303126&r2=1303127&view=diff
==============================================================================
---
incubator/oozie/branches/branch-3.1.3/core/src/main/java/org/apache/oozie/command/coord/CoordResumeXCommand.java
(original)
+++
incubator/oozie/branches/branch-3.1.3/core/src/main/java/org/apache/oozie/command/coord/CoordResumeXCommand.java
Tue Mar 20 20:10:08 2012
@@ -31,8 +31,8 @@ import org.apache.oozie.command.Precondi
import org.apache.oozie.command.ResumeTransitionXCommand;
import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
import org.apache.oozie.command.wf.ResumeXCommand;
-import org.apache.oozie.executor.jpa.CoordActionUpdateJPAExecutor;
-import org.apache.oozie.executor.jpa.CoordJobGetActionsJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordActionUpdateStatusJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordJobGetActionsSuspendedJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobUpdateJPAExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
@@ -113,7 +113,8 @@ public class CoordResumeXCommand extends
InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation());
coordJob.setSuspendedTime(null);
coordJob.setLastModifiedTime(new Date());
- LOG.debug("Resume coordinator job id = " + jobId + ", status = " +
coordJob.getStatus() + ", pending = " + coordJob.isPending());
+ LOG.debug("Resume coordinator job id = " + jobId + ", status = " +
coordJob.getStatus() + ", pending = "
+ + coordJob.isPending());
try {
jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob));
}
@@ -128,23 +129,27 @@ public class CoordResumeXCommand extends
@Override
public void resumeChildren() throws CommandException {
try {
- List<CoordinatorActionBean> actionList = jpaService.execute(new
CoordJobGetActionsJPAExecutor(jobId));
+ // Get all suspended actions to resume them
+ List<CoordinatorActionBean> actionList = jpaService.execute(new
CoordJobGetActionsSuspendedJPAExecutor(
+ jobId));
for (CoordinatorActionBean action : actionList) {
- if(action.getStatus() ==
CoordinatorActionBean.Status.SUSPENDED){
+ if (action.getExternalId() != null) {
// queue a ResumeXCommand
- if (action.getExternalId() != null) {
- queue(new ResumeXCommand(action.getExternalId()));
- updateCoordAction(action);
- LOG.debug("Resume coord action = [{0}], new status =
[{1}], pending = [{2}] and queue ResumeXCommand for [{3}]",
- action.getId(), action.getStatus(),
action.getPending(), action.getExternalId());
- }else {
- updateCoordAction(action);
- LOG.debug("Resume coord action = [{0}], new status =
[{1}], pending = [{2}] and external id is null",
- action.getId(), action.getStatus(),
action.getPending());
- }
+ queue(new ResumeXCommand(action.getExternalId()));
+ updateCoordAction(action);
+ LOG.debug(
+ "Resume coord action = [{0}], new status = [{1}],
pending = [{2}] and queue ResumeXCommand for [{3}]",
+ action.getId(), action.getStatus(),
action.getPending(), action.getExternalId());
+ }
+ else {
+ updateCoordAction(action);
+ LOG.debug(
+ "Resume coord action = [{0}], new status = [{1}],
pending = [{2}] and external id is null",
+ action.getId(), action.getStatus(),
action.getPending());
}
}
+
}
catch (XException ex) {
exceptionOccured = true;
@@ -154,8 +159,8 @@ public class CoordResumeXCommand extends
if (exceptionOccured) {
coordJob.setStatus(CoordinatorJob.Status.FAILED);
coordJob.resetPending();
- LOG.warn("Resume children failed so fail coordinator,
coordinator job id = " + jobId
- + ", status = " + coordJob.getStatus());
+ LOG.warn("Resume children failed so fail coordinator,
coordinator job id = " + jobId + ", status = "
+ + coordJob.getStatus());
try {
jpaService.execute(new
CoordJobUpdateJPAExecutor(coordJob));
}
@@ -183,7 +188,7 @@ public class CoordResumeXCommand extends
action.incrementAndGetPending();
action.setLastModifiedTime(new Date());
try {
- jpaService.execute(new CoordActionUpdateJPAExecutor(action));
+ jpaService.execute(new CoordActionUpdateStatusJPAExecutor(action));
}
catch (JPAExecutorException e) {
throw new CommandException(e);
Modified:
incubator/oozie/branches/branch-3.1.3/core/src/main/java/org/apache/oozie/command/coord/CoordSuspendXCommand.java
URL:
http://svn.apache.org/viewvc/incubator/oozie/branches/branch-3.1.3/core/src/main/java/org/apache/oozie/command/coord/CoordSuspendXCommand.java?rev=1303127&r1=1303126&r2=1303127&view=diff
==============================================================================
---
incubator/oozie/branches/branch-3.1.3/core/src/main/java/org/apache/oozie/command/coord/CoordSuspendXCommand.java
(original)
+++
incubator/oozie/branches/branch-3.1.3/core/src/main/java/org/apache/oozie/command/coord/CoordSuspendXCommand.java
Tue Mar 20 20:10:08 2012
@@ -31,8 +31,8 @@ import org.apache.oozie.command.Precondi
import org.apache.oozie.command.SuspendTransitionXCommand;
import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
import org.apache.oozie.command.wf.SuspendXCommand;
-import org.apache.oozie.executor.jpa.CoordActionUpdateJPAExecutor;
-import org.apache.oozie.executor.jpa.CoordJobGetActionsJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordActionUpdateStatusJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordJobGetActionsRunningJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobUpdateJPAExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
@@ -118,22 +118,25 @@ public class CoordSuspendXCommand extend
@Override
public void suspendChildren() throws CommandException {
try {
- List<CoordinatorActionBean> actionList = jpaService.execute(new
CoordJobGetActionsJPAExecutor(jobId));
+ //Get all running actions of a job to suspend them
+ List<CoordinatorActionBean> actionList = jpaService
+ .execute(new CoordJobGetActionsRunningJPAExecutor(jobId));
for (CoordinatorActionBean action : actionList) {
- if (action.getStatus() ==
CoordinatorActionBean.Status.RUNNING) {
- // queue a SuspendXCommand
- if (action.getExternalId() != null) {
- queue(new SuspendXCommand(action.getExternalId()));
- updateCoordAction(action);
- LOG.debug("Suspend coord action = [{0}], new status =
[{1}], pending = [{2}] and queue SuspendXCommand for [{3}]",
- action.getId(), action.getStatus(),
action.getPending(), action.getExternalId());
- } else {
- updateCoordAction(action);
- LOG.debug("Suspend coord action = [{0}], new status =
[{1}], pending = [{2}] and external id is null",
- action.getId(), action.getStatus(),
action.getPending());
- }
-
+ // queue a SuspendXCommand
+ if (action.getExternalId() != null) {
+ queue(new SuspendXCommand(action.getExternalId()));
+ updateCoordAction(action);
+ LOG.debug(
+ "Suspend coord action = [{0}], new status = [{1}],
pending = [{2}] and queue SuspendXCommand for [{3}]",
+ action.getId(), action.getStatus(),
action.getPending(), action.getExternalId());
+ }
+ else {
+ updateCoordAction(action);
+ LOG.debug(
+ "Suspend coord action = [{0}], new status = [{1}],
pending = [{2}] and external id is null",
+ action.getId(), action.getStatus(),
action.getPending());
}
+
}
LOG.debug("Suspended coordinator actions for the
coordinator=[{0}]", jobId);
}
@@ -145,7 +148,8 @@ public class CoordSuspendXCommand extend
if (exceptionOccured) {
coordJob.setStatus(CoordinatorJob.Status.FAILED);
coordJob.resetPending();
- LOG.debug("Exception happened, fail coordinator job id = " +
jobId + ", status = " + coordJob.getStatus());
+ LOG.debug("Exception happened, fail coordinator job id = " +
jobId + ", status = "
+ + coordJob.getStatus());
try {
jpaService.execute(new
CoordJobUpdateJPAExecutor(coordJob));
}
@@ -190,7 +194,7 @@ public class CoordSuspendXCommand extend
action.incrementAndGetPending();
action.setLastModifiedTime(new Date());
try {
- jpaService.execute(new CoordActionUpdateJPAExecutor(action));
+ jpaService.execute(new CoordActionUpdateStatusJPAExecutor(action));
}
catch (JPAExecutorException e) {
throw new CommandException(e);
Modified:
incubator/oozie/branches/branch-3.1.3/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetJPAExecutor.java
URL:
http://svn.apache.org/viewvc/incubator/oozie/branches/branch-3.1.3/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetJPAExecutor.java?rev=1303127&r1=1303126&r2=1303127&view=diff
==============================================================================
---
incubator/oozie/branches/branch-3.1.3/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetJPAExecutor.java
(original)
+++
incubator/oozie/branches/branch-3.1.3/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetJPAExecutor.java
Tue Mar 20 20:10:08 2012
@@ -93,6 +93,7 @@ public class CoordActionGetJPAExecutor i
action.setNominalTime(a.getNominalTime());
action.setSlaXml(a.getSlaXml());
action.setStatus(a.getStatus());
+ action.setPending(a.getPending());
return action;
}
return null;
Modified:
incubator/oozie/branches/branch-3.1.3/core/src/main/java/org/apache/oozie/service/StatusTransitService.java
URL:
http://svn.apache.org/viewvc/incubator/oozie/branches/branch-3.1.3/core/src/main/java/org/apache/oozie/service/StatusTransitService.java?rev=1303127&r1=1303126&r2=1303127&view=diff
==============================================================================
---
incubator/oozie/branches/branch-3.1.3/core/src/main/java/org/apache/oozie/service/StatusTransitService.java
(original)
+++
incubator/oozie/branches/branch-3.1.3/core/src/main/java/org/apache/oozie/service/StatusTransitService.java
Tue Mar 20 20:10:08 2012
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -41,8 +41,9 @@ import org.apache.oozie.executor.jpa.Bun
import org.apache.oozie.executor.jpa.BundleJobsGetPendingJPAExecutor;
import org.apache.oozie.executor.jpa.BundleJobsGetRunningJPAExecutor;
import
org.apache.oozie.executor.jpa.CoordActionsGetByLastModifiedTimeJPAExecutor;
-import org.apache.oozie.executor.jpa.CoordJobGetActionsJPAExecutor;
+import
org.apache.oozie.executor.jpa.CoordJobGetActionsStatusByPendingFalseJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordJobGetPendingActionsCountJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobUpdateJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobsGetPendingJPAExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
@@ -84,6 +85,7 @@ public class StatusTransitService implem
}
}
+ @Override
public void run() {
try {
Date curDate = new Date(); // records the start time of this
service run;
@@ -251,51 +253,49 @@ public class StatusTransitService implem
String jobId = coordJob.getId();
Job.Status[] coordStatus = new Job.Status[1];
coordStatus[0] = coordJob.getStatus();
- List<CoordinatorActionBean> coordActions = jpaService
- .execute(new
CoordJobGetActionsJPAExecutor(jobId));
+ //Get count of Coordinator actions with pending true
+ int count = jpaService.execute(new
CoordJobGetPendingActionsCountJPAExecutor(jobId));
+ if (count > 0) {
+ continue;
+ }
+ // Code below this is executed only when none of the
Coordinator actions are pending
+ // Get status of Coordinator actions with pending false
+ List<CoordinatorAction.Status> coordActionStatusList =
jpaService
+ .execute(new
CoordJobGetActionsStatusByPendingFalseJPAExecutor(jobId));
HashMap<CoordinatorAction.Status, Integer>
coordActionStatus = new HashMap<CoordinatorAction.Status, Integer>();
- boolean foundPending = false;
- for (CoordinatorActionBean cAction : coordActions) {
- if (!cAction.isPending()) {
- int counter = 0;
- if
(coordActionStatus.containsKey(cAction.getStatus())) {
- counter =
coordActionStatus.get(cAction.getStatus()) + 1;
- }
- else {
- ++counter;
- }
- coordActionStatus.put(cAction.getStatus(),
counter);
+
+ for (CoordinatorAction.Status status :
coordActionStatusList) {
+ int counter = 0;
+ if (coordActionStatus.containsKey(status)) {
+ counter = coordActionStatus.get(status) + 1;
}
else {
- foundPending = true;
- break;
+ ++counter;
}
+ coordActionStatus.put(status, counter);
}
- if (foundPending) {
- continue;
- }
-
+ int nonPendingCoordActionsCount =
coordActionStatusList.size();
if (coordJob.isDoneMaterialization()
- && checkCoordTerminalStatus(coordActionStatus,
coordActions, coordStatus)) {
+ && checkCoordTerminalStatus(coordActionStatus,
nonPendingCoordActionsCount, coordStatus)) {
LOG.info("Set coordinator job [" + jobId + "]
status to '" + coordStatus[0].toString()
+ "' from '" + coordJob.getStatus() + "'");
- updateCoordJob(coordActionStatus, coordActions,
coordJob, coordStatus[0]);
+ updateCoordJob(coordActionStatus,
nonPendingCoordActionsCount, coordJob, coordStatus[0]);
}
else if (coordJob.isDoneMaterialization()
- && checkCoordSuspendStatus(coordActionStatus,
coordActions, coordStatus)) {
+ && checkCoordSuspendStatus(coordActionStatus,
nonPendingCoordActionsCount, coordStatus)) {
LOG.info("Set coordinator job [" + jobId + "]
status to " + coordStatus[0].toString()
+ "' from '" + coordJob.getStatus() + "'");
- updateCoordJob(coordActionStatus, coordActions,
coordJob, coordStatus[0]);
+ updateCoordJob(coordActionStatus,
nonPendingCoordActionsCount, coordJob, coordStatus[0]);
}
- else if (checkCoordRunningStatus(coordActionStatus,
coordActions, coordStatus)) {
+ else if (checkCoordRunningStatus(coordActionStatus,
nonPendingCoordActionsCount, coordStatus)) {
LOG.info("Set coordinator job [" + jobId + "]
status to " + coordStatus[0].toString()
+ "' from '" + coordJob.getStatus() + "'");
- updateCoordJob(coordActionStatus, coordActions,
coordJob, coordStatus[0]);
+ updateCoordJob(coordActionStatus,
nonPendingCoordActionsCount, coordJob, coordStatus[0]);
}
// checking pending flag for job when user killed or
suspended the job
else {
- checkCoordPending(coordActionStatus, coordActions,
coordJob, true);
+ checkCoordPending(coordActionStatus,
nonPendingCoordActionsCount, coordJob, true);
}
}
catch (Exception ex) {
@@ -353,7 +353,7 @@ public class StatusTransitService implem
}
private boolean
checkCoordTerminalStatus(HashMap<CoordinatorAction.Status, Integer>
coordActionStatus,
- List<CoordinatorActionBean> coordActions, Job.Status[]
coordStatus) {
+ int coordActionsCount, Job.Status[] coordStatus) {
boolean ret = false;
int totalValuesSucceed = 0;
if
(coordActionStatus.containsKey(CoordinatorAction.Status.SUCCEEDED)) {
@@ -373,18 +373,18 @@ public class StatusTransitService implem
totalValuesTimeOut =
coordActionStatus.get(CoordinatorAction.Status.TIMEDOUT);
}
- if (coordActions.size() == (totalValuesSucceed + totalValuesFailed
+ totalValuesKilled + totalValuesTimeOut)) {
+ if (coordActionsCount == (totalValuesSucceed + totalValuesFailed +
totalValuesKilled + totalValuesTimeOut)) {
// If all the coordinator actions are succeeded then
coordinator job should be succeeded.
- if (coordActions.size() == totalValuesSucceed) {
+ if (coordActionsCount == totalValuesSucceed) {
coordStatus[0] = Job.Status.SUCCEEDED;
ret = true;
}
- else if (coordActions.size() == totalValuesKilled) {
+ else if (coordActionsCount == totalValuesKilled) {
// If all the coordinator actions are KILLED then
coordinator job should be KILLED.
coordStatus[0] = Job.Status.KILLED;
ret = true;
}
- else if (coordActions.size() == totalValuesFailed) {
+ else if (coordActionsCount == totalValuesFailed) {
// If all the coordinator actions are FAILED then
coordinator job should be FAILED.
coordStatus[0] = Job.Status.FAILED;
ret = true;
@@ -451,10 +451,10 @@ public class StatusTransitService implem
}
private boolean
checkCoordSuspendStatus(HashMap<CoordinatorAction.Status, Integer>
coordActionStatus,
- List<CoordinatorActionBean> coordActions, Job.Status[]
coordStatus) {
+ int coordActionsCount, Job.Status[] coordStatus) {
boolean ret = false;
if
(coordActionStatus.containsKey(CoordinatorAction.Status.SUSPENDED)) {
- if (coordActions.size() ==
coordActionStatus.get(CoordinatorAction.Status.SUSPENDED)) {
+ if (coordActionsCount ==
coordActionStatus.get(CoordinatorAction.Status.SUSPENDED)) {
coordStatus[0] = Job.Status.SUSPENDED;
ret = true;
}
@@ -463,11 +463,11 @@ public class StatusTransitService implem
}
private boolean
checkCoordRunningStatus(HashMap<CoordinatorAction.Status, Integer>
coordActionStatus,
- List<CoordinatorActionBean> coordActions, Job.Status[]
coordStatus) {
+ int coordActionsCount, Job.Status[] coordStatus) {
boolean ret = false;
if
(coordActionStatus.containsKey(CoordinatorAction.Status.RUNNING)) {
// If all the bundle actions are succeeded then bundle job
should be succeeded.
- if (coordActions.size() ==
coordActionStatus.get(CoordinatorAction.Status.RUNNING)) {
+ if (coordActionsCount ==
coordActionStatus.get(CoordinatorAction.Status.RUNNING)) {
coordStatus[0] = Job.Status.RUNNING;
ret = true;
}
@@ -543,7 +543,7 @@ public class StatusTransitService implem
}
private void updateCoordJob(HashMap<CoordinatorAction.Status, Integer>
coordActionStatus,
- List<CoordinatorActionBean> coordActions, CoordinatorJobBean
coordJob, Job.Status coordStatus)
+ int coordActionsCount, CoordinatorJobBean coordJob, Job.Status
coordStatus)
throws JPAExecutorException, CommandException {
Job.Status prevStatus = coordJob.getStatus();
// Update the Coord Job
@@ -556,7 +556,7 @@ public class StatusTransitService implem
}
}
- checkCoordPending(coordActionStatus, coordActions, coordJob,
false);
+ checkCoordPending(coordActionStatus, coordActionsCount, coordJob,
false);
coordJob.setStatus(coordStatus);
coordJob.setStatus(StatusUtils.getStatus(coordJob));
coordJob.setLastModifiedTime(new Date());
@@ -571,7 +571,7 @@ public class StatusTransitService implem
}
private void checkCoordPending(HashMap<CoordinatorAction.Status,
Integer> coordActionStatus,
- List<CoordinatorActionBean> coordActions, CoordinatorJobBean
coordJob, boolean saveToDB) throws JPAExecutorException {
+ int coordActionsCount, CoordinatorJobBean coordJob, boolean
saveToDB) throws JPAExecutorException {
boolean pendingCoordJob = coordJob.isPending();
// Checking the coordinator pending should be updated or not
int totalNonPendingActions = 0;
@@ -579,7 +579,7 @@ public class StatusTransitService implem
totalNonPendingActions += coordActionStatus.get(js);
}
- if (totalNonPendingActions == coordActions.size()) {
+ if (totalNonPendingActions == coordActionsCount) {
pendingCoordJob = false;
}
@@ -613,7 +613,8 @@ public class StatusTransitService implem
else {
LOG.info("Running coordinator status service from last
instance time = "
+
DateUtils.convertDateToString(lastInstanceStartTime));
- // this is not the first instance, we should only check jobs
that have actions been
+ // this is not the first instance, we should only check jobs
+ // that have actions been
// updated >= start time of last service run;
List<CoordinatorActionBean> actionList = jpaService
.execute(new
CoordActionsGetByLastModifiedTimeJPAExecutor(lastInstanceStartTime));
@@ -664,3 +665,4 @@ public class StatusTransitService implem
return StatusTransitService.class;
}
}
+
Modified: incubator/oozie/branches/branch-3.1.3/release-log.txt
URL:
http://svn.apache.org/viewvc/incubator/oozie/branches/branch-3.1.3/release-log.txt?rev=1303127&r1=1303126&r2=1303127&view=diff
==============================================================================
--- incubator/oozie/branches/branch-3.1.3/release-log.txt (original)
+++ incubator/oozie/branches/branch-3.1.3/release-log.txt Tue Mar 20 20:10:08
2012
@@ -1,3 +1,6 @@
+-- Oozie 3.1.3.1 release
+OOZIE-587 Out of memory issues due to current query design.(virag via mohammad)
+
-- Oozie 3.1.3 release
OOZIE-694 Update the Install and Quick start guide with appropriate hadoop
versions for branch-3.1 (Mohammad)