Author: angeloh
Date: Mon Feb 27 23:55:30 2012
New Revision: 1294403
URL: http://svn.apache.org/viewvc?rev=1294403&view=rev
Log:
Oozie-677 Add Filter API for status on coordinator actions (Virag Kothari via
Angelo)
Modified:
incubator/oozie/trunk/client/src/main/java/org/apache/oozie/cli/OozieCLI.java
incubator/oozie/trunk/client/src/main/java/org/apache/oozie/client/OozieClient.java
incubator/oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/RestConstants.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/BaseEngine.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/BundleEngine.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/CoordinatorEngine.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/DagEngine.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/ErrorCode.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordJobXCommand.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetActionsSubsetJPAExecutor.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/servlet/V1JobServlet.java
incubator/oozie/trunk/core/src/test/java/org/apache/oozie/TestCoordinatorEngine.java
incubator/oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobGetActionsSubsetJPAExecutor.java
incubator/oozie/trunk/core/src/test/java/org/apache/oozie/servlet/MockCoordinatorEngineService.java
incubator/oozie/trunk/docs/src/site/twiki/DG_CommandLineTool.twiki
incubator/oozie/trunk/release-log.txt
Modified:
incubator/oozie/trunk/client/src/main/java/org/apache/oozie/cli/OozieCLI.java
URL:
http://svn.apache.org/viewvc/incubator/oozie/trunk/client/src/main/java/org/apache/oozie/cli/OozieCLI.java?rev=1294403&r1=1294402&r2=1294403&view=diff
==============================================================================
---
incubator/oozie/trunk/client/src/main/java/org/apache/oozie/cli/OozieCLI.java
(original)
+++
incubator/oozie/trunk/client/src/main/java/org/apache/oozie/cli/OozieCLI.java
Mon Feb 27 23:55:30 2012
@@ -210,6 +210,8 @@ public class OozieCLI {
Option info = new Option(INFO_OPTION, true, "info of a job");
Option offset = new Option(OFFSET_OPTION, true, "job info offset of
actions (default '1', requires -info)");
Option len = new Option(LEN_OPTION, true, "number of actions (default
TOTAL ACTIONS, requires -info)");
+ Option filter = new Option(FILTER_OPTION, true,
+ "status=<S1>[;status=<S2>]* (All Coordinator actions
satisfying any one of the status filters will be retreived. Currently, only
supported for Coordinator job)");
Option localtime = new Option(LOCAL_TIME_OPTION, false, "use local
time (default GMT)");
Option log = new Option(LOG_OPTION, true, "job log");
Option definition = new Option(DEFINITION_OPTION, true, "job
definition");
@@ -254,6 +256,7 @@ public class OozieCLI {
jobOptions.addOption(verbose);
jobOptions.addOption(offset);
jobOptions.addOption(len);
+ jobOptions.addOption(filter);
jobOptions.addOption(action);
jobOptions.addOption(date);
jobOptions.addOption(rerun_coord);
@@ -707,7 +710,8 @@ public class OozieCLI {
int start = Integer.parseInt((s != null) ? s : "0");
s = commandLine.getOptionValue(LEN_OPTION);
int len = Integer.parseInt((s != null) ? s : "0");
-
printCoordJob(wc.getCoordJobInfo(commandLine.getOptionValue(INFO_OPTION),
start, len), options
+ String filter = commandLine.getOptionValue(FILTER_OPTION);
+
printCoordJob(wc.getCoordJobInfo(commandLine.getOptionValue(INFO_OPTION),
filter, start, len), options
.contains(LOCAL_TIME_OPTION),
options.contains(VERBOSE_OPTION));
}
else if
(commandLine.getOptionValue(INFO_OPTION).contains("-C@")) {
Modified:
incubator/oozie/trunk/client/src/main/java/org/apache/oozie/client/OozieClient.java
URL:
http://svn.apache.org/viewvc/incubator/oozie/trunk/client/src/main/java/org/apache/oozie/client/OozieClient.java?rev=1294403&r1=1294402&r2=1294403&view=diff
==============================================================================
---
incubator/oozie/trunk/client/src/main/java/org/apache/oozie/client/OozieClient.java
(original)
+++
incubator/oozie/trunk/client/src/main/java/org/apache/oozie/client/OozieClient.java
Mon Feb 27 23:55:30 2012
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -121,7 +121,7 @@ public class OozieClient {
public static final String FILTER_FREQUENCY = "frequency";
public static final String FILTER_ID = "id";
-
+
public static final String FILTER_UNIT = "unit";
public static final String CHANGE_VALUE_ENDTIME = "endtime";
@@ -878,9 +878,9 @@ public class OozieClient {
private class CoordJobInfo extends ClientCallable<CoordinatorJob> {
- CoordJobInfo(String jobId, int start, int len) {
+ CoordJobInfo(String jobId, String filter, int start, int len) {
super("GET", RestConstants.JOB, notEmpty(jobId, "jobId"),
prepareParams(RestConstants.JOB_SHOW_PARAM,
- RestConstants.JOB_SHOW_INFO, RestConstants.OFFSET_PARAM,
Integer.toString(start),
+ RestConstants.JOB_SHOW_INFO,
RestConstants.JOB_FILTER_PARAM, filter, RestConstants.OFFSET_PARAM,
Integer.toString(start),
RestConstants.LEN_PARAM, Integer.toString(len)));
}
@@ -958,20 +958,21 @@ public class OozieClient {
* @throws OozieClientException thrown if the job info could not be
retrieved.
*/
public CoordinatorJob getCoordJobInfo(String jobId) throws
OozieClientException {
- return new CoordJobInfo(jobId, 0, 0).call();
+ return new CoordJobInfo(jobId, null, 0, 0).call();
}
/**
* Get the info of a coordinator job and subset actions.
*
* @param jobId job Id.
+ * @param filter filter the status filter
* @param start starting index in the list of actions belonging to the job
* @param len number of actions to be returned
* @return the job info.
* @throws OozieClientException thrown if the job info could not be
retrieved.
*/
- public CoordinatorJob getCoordJobInfo(String jobId, int start, int len)
throws OozieClientException {
- return new CoordJobInfo(jobId, start, len).call();
+ public CoordinatorJob getCoordJobInfo(String jobId, String filter, int
start, int len) throws OozieClientException {
+ return new CoordJobInfo(jobId, filter, start, len).call();
}
/**
Modified:
incubator/oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/RestConstants.java
URL:
http://svn.apache.org/viewvc/incubator/oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/RestConstants.java?rev=1294403&r1=1294402&r2=1294403&view=diff
==============================================================================
---
incubator/oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/RestConstants.java
(original)
+++
incubator/oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/RestConstants.java
Mon Feb 27 23:55:30 2012
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -44,6 +44,8 @@ public interface RestConstants {
public static final String LEN_PARAM = "len";
+ public static final String JOB_FILTER_PARAM = "filter";
+
public static final String JOB_RESOURCE = "/job";
public static final String JOB_ACTION_START = "start";
Modified:
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/BaseEngine.java
URL:
http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/BaseEngine.java?rev=1294403&r1=1294402&r2=1294403&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/BaseEngine.java
(original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/BaseEngine.java
Mon Feb 27 23:55:30 2012
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -171,12 +171,13 @@ public abstract class BaseEngine {
* Return the info about a coord job with actions subset.
*
* @param jobId job Id.
+ * @param filter the status filter
* @param start starting from this index in the list of actions belonging
to the job
* @param length number of actions to be returned
* @return the coord job info.
* @throws BaseEngineException thrown if the job info could not be
obtained.
*/
- public abstract CoordinatorJob getCoordJob(String jobId, int start, int
length) throws BaseEngineException;
+ public abstract CoordinatorJob getCoordJob(String jobId, String filter,
int start, int length) throws BaseEngineException;
/**
* Return the a job definition.
Modified:
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/BundleEngine.java
URL:
http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/BundleEngine.java?rev=1294403&r1=1294402&r2=1294403&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/BundleEngine.java
(original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/BundleEngine.java
Mon Feb 27 23:55:30 2012
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -118,7 +118,7 @@ public class BundleEngine extends BaseEn
* @see org.apache.oozie.BaseEngine#getCoordJob(java.lang.String, int, int)
*/
@Override
- public CoordinatorJob getCoordJob(String jobId, int start, int length)
throws BundleEngineException {
+ public CoordinatorJob getCoordJob(String jobId, String filter, int start,
int length) throws BundleEngineException {
throw new BundleEngineException(new XException(ErrorCode.E0301));
}
@@ -294,17 +294,17 @@ public class BundleEngine extends BaseEn
/**
* Get bundle jobs
*
- * @param filterStr the filter string
+ * @param filter the filter string
* @param start start location for paging
* @param len total length to get
* @return bundle job info
* @throws BundleEngineException thrown if failed to get bundle job info
*/
- public BundleJobInfo getBundleJobs(String filterStr, int start, int len)
throws BundleEngineException {
- Map<String, List<String>> filter = parseFilter(filterStr);
+ public BundleJobInfo getBundleJobs(String filter, int start, int len)
throws BundleEngineException {
+ Map<String, List<String>> filterList = parseFilter(filter);
try {
- return new BundleJobsXCommand(filter, start, len).call();
+ return new BundleJobsXCommand(filterList, start, len).call();
}
catch (CommandException ex) {
throw new BundleEngineException(ex);
Modified:
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/CoordinatorEngine.java
URL:
http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/CoordinatorEngine.java?rev=1294403&r1=1294402&r2=1294403&view=diff
==============================================================================
---
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/CoordinatorEngine.java
(original)
+++
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/CoordinatorEngine.java
Mon Feb 27 23:55:30 2012
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -29,6 +29,7 @@ import java.util.Map;
import java.util.Set;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
+import org.apache.oozie.client.CoordinatorAction;
import org.apache.oozie.client.CoordinatorJob;
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.client.WorkflowJob;
@@ -161,13 +162,14 @@ public class CoordinatorEngine extends B
/*
* (non-Javadoc)
*
- * @see org.apache.oozie.BaseEngine#getCoordJob(java.lang.String, int, int)
+ * @see org.apache.oozie.BaseEngine#getCoordJob(java.lang.String,
java.lang.String, int, int)
*/
@Override
- public CoordinatorJobBean getCoordJob(String jobId, int start, int length)
throws BaseEngineException {
+ public CoordinatorJobBean getCoordJob(String jobId, String filter, int
start, int length) throws BaseEngineException {
+ List<String> filterList = parseStatusFilter(filter);
try {
if (useXCommand) {
- return new CoordJobXCommand(jobId, start, length).call();
+ return new CoordJobXCommand(jobId, filterList, start,
length).call();
}
else {
return new CoordJobCommand(jobId, start, length).call();
@@ -509,21 +511,21 @@ public class CoordinatorEngine extends B
}
/**
- * @param filterStr
+ * @param filter
* @param start
* @param len
* @return CoordinatorJobInfo
* @throws CoordinatorEngineException
*/
- public CoordinatorJobInfo getCoordJobs(String filterStr, int start, int
len) throws CoordinatorEngineException {
- Map<String, List<String>> filter = parseFilter(filterStr);
+ public CoordinatorJobInfo getCoordJobs(String filter, int start, int len)
throws CoordinatorEngineException {
+ Map<String, List<String>> filterList = parseFilter(filter);
try {
if (useXCommand) {
- return new CoordJobsXCommand(filter, start, len).call();
+ return new CoordJobsXCommand(filterList, start, len).call();
}
else {
- return new CoordJobsCommand(filter, start, len).call();
+ return new CoordJobsCommand(filterList, start, len).call();
}
}
catch (CommandException ex) {
@@ -531,6 +533,49 @@ public class CoordinatorEngine extends B
}
}
+
+ // Parses the filter string (e.g status=RUNNING;status=WAITING) and
returns a list of status values
+ private List<String> parseStatusFilter(String filter) throws
CoordinatorEngineException {
+ List<String> filterList = new ArrayList<String>();
+ if (filter != null) {
+ //split name;value pairs
+ StringTokenizer st = new StringTokenizer(filter, ";");
+ while (st.hasMoreTokens()) {
+ String token = st.nextToken();
+ if (token.contains("=")) {
+ String[] pair = token.split("=");
+ if (pair.length != 2) {
+ throw new CoordinatorEngineException(ErrorCode.E0421,
token,
+ "elements must be name=value pairs");
+ }
+ if (pair[0].equalsIgnoreCase("status")) {
+ String statusValue = pair[1];
+ try {
+ CoordinatorAction.Status.valueOf(statusValue);
+ } catch (IllegalArgumentException ex) {
+ StringBuilder validStatusList = new
StringBuilder();
+ for (CoordinatorAction.Status status:
CoordinatorAction.Status.values()){
+ validStatusList.append(status.toString()+" ");
+ }
+ // Check for incorrect status value
+ throw new
CoordinatorEngineException(ErrorCode.E0421, filter, XLog.format(
+ "invalid status value [{0}]." + " Valid
status values are: [{1}]", statusValue, validStatusList));
+ }
+ filterList.add(statusValue);
+ } else {
+ // Check for incorrect filter option
+ throw new CoordinatorEngineException(ErrorCode.E0421,
filter, XLog.format(
+ "invalid filter [{0}]." + " The only valid
filter is \"status\"", pair[0]));
+ }
+ } else {
+ throw new CoordinatorEngineException(ErrorCode.E0421,
token,
+ "elements must be name=value pairs");
+ }
+ }
+ }
+ return filterList;
+ }
+
/**
* @param filter
* @return Map<String, List<String>>
Modified:
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/DagEngine.java
URL:
http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/DagEngine.java?rev=1294403&r1=1294402&r2=1294403&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/DagEngine.java
(original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/DagEngine.java
Mon Feb 27 23:55:30 2012
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -104,7 +104,7 @@ public class DagEngine extends BaseEngin
*/
public DagEngine(String user, String authToken) {
this();
-
+
this.user = ParamChecker.notEmpty(user, "user");
this.authToken = ParamChecker.notEmpty(authToken, "authToken");
}
@@ -120,10 +120,10 @@ public class DagEngine extends BaseEngin
@Override
public String submitJob(Configuration conf, boolean startJob) throws
DagEngineException {
validateSubmitConfiguration(conf);
-
+
try {
String jobId;
- if (useXCommand) {
+ if (useXCommand) {
SubmitXCommand submit = new SubmitXCommand(conf,
getAuthToken());
jobId = submit.call();
}
@@ -176,7 +176,7 @@ public class DagEngine extends BaseEngin
submit = new SubmitMRCommand(conf, getAuthToken());
}
- jobId = submit.call();
+ jobId = submit.call();
}
start(jobId);
return jobId;
@@ -309,7 +309,7 @@ public class DagEngine extends BaseEngin
public void reRun(String jobId, Configuration conf) throws
DagEngineException {
try {
validateReRunConfiguration(conf);
-
+
if (useXCommand) {
new ReRunXCommand(jobId, conf, getAuthToken()).call();
}
@@ -509,20 +509,20 @@ public class DagEngine extends BaseEngin
/**
* Return the info about a set of jobs.
*
- * @param filterStr job filter. Refer to the {@link
org.apache.oozie.client.OozieClient} for the filter syntax.
+ * @param filter job filter. Refer to the {@link
org.apache.oozie.client.OozieClient} for the filter syntax.
* @param start offset, base 1.
* @param len number of jobs to return.
* @return job info for all matching jobs, the jobs don't contain node
action information.
* @throws DagEngineException thrown if the jobs info could not be
obtained.
*/
- public WorkflowsInfo getJobs(String filterStr, int start, int len) throws
DagEngineException {
- Map<String, List<String>> filter = parseFilter(filterStr);
+ public WorkflowsInfo getJobs(String filter, int start, int len) throws
DagEngineException {
+ Map<String, List<String>> filterList = parseFilter(filter);
try {
if (useXCommand) {
- return new JobsXCommand(filter, start, len).call();
+ return new JobsXCommand(filterList, start, len).call();
}
else {
- return new JobsCommand(filter, start, len).call();
+ return new JobsCommand(filterList, start, len).call();
}
}
catch (CommandException dce) {
@@ -558,7 +558,7 @@ public class DagEngine extends BaseEngin
}
@Override
- public CoordinatorJob getCoordJob(String jobId, int start, int length)
throws BaseEngineException {
+ public CoordinatorJob getCoordJob(String jobId, String filter, int start,
int length) throws BaseEngineException {
throw new BaseEngineException(new XException(ErrorCode.E0301));
}
Modified:
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/ErrorCode.java
URL:
http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/ErrorCode.java?rev=1294403&r1=1294402&r2=1294403&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/ErrorCode.java
(original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/ErrorCode.java
Mon Feb 27 23:55:30 2012
@@ -75,6 +75,7 @@ public enum ErrorCode {
E0404(XLog.STD, "Only one of the properties are allowed [{0}]"),
E0420(XLog.STD, "Invalid jobs filter [{0}], {1}"),
+ E0421(XLog.STD, "Invalid job filter [{0}], {1}"),
E0500(XLog.OPS, "Not authorized, {0}"),
E0501(XLog.OPS, "Could not perform authorization operation, {0}"),
Modified:
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordJobXCommand.java
URL:
http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordJobXCommand.java?rev=1294403&r1=1294402&r2=1294403&view=diff
==============================================================================
---
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordJobXCommand.java
(original)
+++
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordJobXCommand.java
Mon Feb 27 23:55:30 2012
@@ -17,6 +17,7 @@
*/
package org.apache.oozie.command.coord;
+import java.util.Collections;
import java.util.List;
import org.apache.oozie.CoordinatorActionBean;
@@ -40,6 +41,7 @@ public class CoordJobXCommand extends Co
private final boolean getActionInfo;
private int start = 1;
private int len = Integer.MAX_VALUE;
+ private List<String> filterList;
/**
* Constructor for loading a coordinator job information
@@ -47,7 +49,7 @@ public class CoordJobXCommand extends Co
* @param id coord jobId
*/
public CoordJobXCommand(String id) {
- this(id, 1, Integer.MAX_VALUE);
+ this(id, Collections.<String>emptyList(), 1, Integer.MAX_VALUE);
}
/**
@@ -57,10 +59,11 @@ public class CoordJobXCommand extends Co
* @param start starting index in the list of actions belonging to the job
* @param length number of actions to be returned
*/
- public CoordJobXCommand(String id, int start, int length) {
+ public CoordJobXCommand(String id, List<String> filterList, int start, int
length) {
super("job.info", "job.info", 1);
this.id = ParamChecker.notEmpty(id, "id");
this.getActionInfo = true;
+ this.filterList = filterList;
this.start = start;
this.len = length;
}
@@ -119,7 +122,7 @@ public class CoordJobXCommand extends Co
coordJob = jpaService.execute(new CoordJobGetJPAExecutor(id));
if (getActionInfo) {
List<CoordinatorActionBean> coordActions = jpaService
- .execute(new
CoordJobGetActionsSubsetJPAExecutor(id, start, len));
+ .execute(new
CoordJobGetActionsSubsetJPAExecutor(id, filterList, start, len));
coordJob.setActions(coordActions);
}
}
Modified:
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetActionsSubsetJPAExecutor.java
URL:
http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetActionsSubsetJPAExecutor.java?rev=1294403&r1=1294402&r2=1294403&view=diff
==============================================================================
---
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetActionsSubsetJPAExecutor.java
(original)
+++
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetActionsSubsetJPAExecutor.java
Mon Feb 27 23:55:30 2012
@@ -35,14 +35,17 @@ public class CoordJobGetActionsSubsetJPA
private String coordJobId = null;
private int start = 1;
private int len = 50;
+ private List<String> filterList;
public CoordJobGetActionsSubsetJPAExecutor(String coordJobId) {
ParamChecker.notNull(coordJobId, "coordJobId");
this.coordJobId = coordJobId;
}
- public CoordJobGetActionsSubsetJPAExecutor(String coordJobId, int start,
int len) {
+ public CoordJobGetActionsSubsetJPAExecutor(String coordJobId, List<String>
filterList, int start, int len) {
this(coordJobId);
+ ParamChecker.notNull(filterList, "filterList");
+ this.filterList = filterList;
this.start = start;
this.len = len;
}
@@ -59,10 +62,22 @@ public class CoordJobGetActionsSubsetJPA
List<CoordinatorActionBean> actionList = new
ArrayList<CoordinatorActionBean>();
try {
Query q =
em.createNamedQuery("GET_ACTIONS_FOR_COORD_JOB_ORDER_BY_NOMINAL_TIME");
+ if (!filterList.isEmpty()) {
+ // Add the filter clause
+ String query = q.toString();
+ StringBuilder sbTotal = new StringBuilder(query);
+ int offset = query.lastIndexOf("order");
+ // Get the 'where' clause for status filters
+ StringBuilder statusClause = getStatusClause(filterList);
+ // Insert 'where' before 'order by'
+ sbTotal.insert(offset, statusClause);
+ q = em.createQuery(sbTotal.toString());
+ }
q.setParameter("jobId", coordJobId);
q.setFirstResult(start - 1);
q.setMaxResults(len);
actions = q.getResultList();
+
for (CoordinatorActionBean a : actions) {
CoordinatorActionBean aa = getBeanForRunningCoordAction(a);
actionList.add(aa);
@@ -74,6 +89,23 @@ public class CoordJobGetActionsSubsetJPA
return actionList;
}
+ // Form the where clause to filter by status values
+ private StringBuilder getStatusClause(List<String> filterList) {
+ StringBuilder sb = new StringBuilder();
+ boolean isStatus = false;
+ for (String statusVal : filterList) {
+ if (!isStatus) {
+ sb.append(" and a.status IN (\'" + statusVal + "\'");
+ isStatus = true;
+ }
+ else {
+ sb.append(",\'" + statusVal + "\'");
+ }
+ }
+ sb.append(") ");
+ return sb;
+ }
+
private CoordinatorActionBean
getBeanForRunningCoordAction(CoordinatorActionBean a) {
if (a != null) {
CoordinatorActionBean action = new CoordinatorActionBean();
Modified:
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/servlet/V1JobServlet.java
URL:
http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/servlet/V1JobServlet.java?rev=1294403&r1=1294402&r2=1294403&view=diff
==============================================================================
---
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/servlet/V1JobServlet.java
(original)
+++
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/servlet/V1JobServlet.java
Mon Feb 27 23:55:30 2012
@@ -715,14 +715,15 @@ public class V1JobServlet extends BaseJo
String jobId = getResourceName(request);
String startStr = request.getParameter(RestConstants.OFFSET_PARAM);
String lenStr = request.getParameter(RestConstants.LEN_PARAM);
+ String filter = request.getParameter(RestConstants.JOB_FILTER_PARAM);
int start = (startStr != null) ? Integer.parseInt(startStr) : 1;
start = (start < 1) ? 1 : start;
// Get default number of coordinator actions to be retrieved
int defaultLen =
Services.get().getConf().getInt(COORD_ACTIONS_DEFAULT_LENGTH, 1000);
int len = (lenStr != null) ? Integer.parseInt(lenStr) : 0;
len = (len < 1) ? defaultLen : len;
- try {
- JsonCoordinatorJob coordJob = coordEngine.getCoordJob(jobId,
start, len);
+ try {
+ JsonCoordinatorJob coordJob = coordEngine.getCoordJob(jobId,
filter, start, len);
jobBean = coordJob;
}
catch (CoordinatorEngineException ex) {
Modified:
incubator/oozie/trunk/core/src/test/java/org/apache/oozie/TestCoordinatorEngine.java
URL:
http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/test/java/org/apache/oozie/TestCoordinatorEngine.java?rev=1294403&r1=1294402&r2=1294403&view=diff
==============================================================================
---
incubator/oozie/trunk/core/src/test/java/org/apache/oozie/TestCoordinatorEngine.java
(original)
+++
incubator/oozie/trunk/core/src/test/java/org/apache/oozie/TestCoordinatorEngine.java
Mon Feb 27 23:55:30 2012
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -451,7 +451,69 @@ public class TestCoordinatorEngine exten
private void _testSubsetActions(final String jobId) throws Exception {
CoordinatorEngine ce = new CoordinatorEngine(getTestUser(),
"UNIT_TESTING");
- CoordinatorJob job = ce.getCoordJob(jobId, 1, 2);
+ // Check for WAITING filter
+ CoordinatorJob job = ce.getCoordJob(jobId, "status=WAITING", 1, 2);
+ // As both actions are waiting, expected result size is 2
assertEquals(job.getActions().size(), 2);
+
+ job = ce.getCoordJob(jobId, "status=RUNNING", 1, 2);
+ assertEquals(job.getActions().size(), 0);
+
+ //Check for actions WAITING OR RUNNING
+ job = ce.getCoordJob(jobId, "status=RUNNING;status=WAITING", 1, 2);
+ assertEquals(job.getActions().size(), 2);
+
+ //Check without filters
+ job = ce.getCoordJob(jobId, null, 1, 2);
+ assertEquals(job.getActions().size(), 2);
+
+ //Check for empty filter list
+ job = ce.getCoordJob(jobId, "", 1, 2);
+ assertEquals(job.getActions().size(), 2);
+
+ //Check for missing "="
+ try {
+ job = ce.getCoordJob(jobId, "statusRUNNING", 1, 2);
+ }
+ catch (CoordinatorEngineException ex) {
+ assertEquals(ErrorCode.E0421, ex.getErrorCode());
+ assertEquals("E0421: Invalid job filter [statusRUNNING], elements
must be name=value pairs", ex.getMessage());
+ }
+
+ //Check for missing value after "="
+ try {
+ job = ce.getCoordJob(jobId, "status=", 1, 2);
+ }
+ catch (CoordinatorEngineException ex) {
+ assertEquals(ErrorCode.E0421, ex.getErrorCode());
+ assertEquals("E0421: Invalid job filter [status=], elements must
be name=value pairs", ex.getMessage());
+ }
+
+ // Check for invalid status value
+ try {
+ job = ce.getCoordJob(jobId, "status=blahblah", 1, 2);
+ }
+ catch (CoordinatorEngineException ex) {
+ assertEquals(ErrorCode.E0421, ex.getErrorCode());
+ assertEquals("E0421: Invalid job filter [status=blahblah], invalid
status value [blahblah]. Valid status values are: [WAITING READY SUBMITTED
RUNNING SUSPENDED TIMEDOUT SUCCEEDED KILLED FAILED DISCARDED ]",
ex.getMessage());
+ }
+
+ // Check for empty status value
+ try {
+ job = ce.getCoordJob(jobId, "status=\"\"", 1, 2);
+ }
+ catch (CoordinatorEngineException ex) {
+ assertEquals(ErrorCode.E0421, ex.getErrorCode());
+ assertEquals("E0421: Invalid job filter [status=\"\"], invalid
status value [\"\"]. Valid status values are: [WAITING READY SUBMITTED RUNNING
SUSPENDED TIMEDOUT SUCCEEDED KILLED FAILED DISCARDED ]", ex.getMessage());
+ }
+
+ // Check for invalid filter option
+ try {
+ job = ce.getCoordJob(jobId, "blahblah=blahblah", 1, 2);
+ }
+ catch (CoordinatorEngineException ex) {
+ assertEquals(ErrorCode.E0421, ex.getErrorCode());
+ assertEquals("E0421: Invalid job filter [blahblah=blahblah],
invalid filter [blahblah]. The only valid filter is \"status\"",
ex.getMessage());
+ }
}
}
Modified:
incubator/oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobGetActionsSubsetJPAExecutor.java
URL:
http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobGetActionsSubsetJPAExecutor.java?rev=1294403&r1=1294402&r2=1294403&view=diff
==============================================================================
---
incubator/oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobGetActionsSubsetJPAExecutor.java
(original)
+++
incubator/oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobGetActionsSubsetJPAExecutor.java
Mon Feb 27 23:55:30 2012
@@ -17,6 +17,8 @@
*/
package org.apache.oozie.executor.jpa;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import org.apache.oozie.CoordinatorActionBean;
@@ -58,7 +60,7 @@ public class TestCoordJobGetActionsSubse
private void _testGetActionsSubset(String jobId, String actionId, int
start, int len) throws Exception {
JPAService jpaService = Services.get().get(JPAService.class);
assertNotNull(jpaService);
- CoordJobGetActionsSubsetJPAExecutor actionGetCmd = new
CoordJobGetActionsSubsetJPAExecutor(jobId, start, len);
+ CoordJobGetActionsSubsetJPAExecutor actionGetCmd = new
CoordJobGetActionsSubsetJPAExecutor(jobId, Collections.<String>emptyList(),
start, len);
List<CoordinatorActionBean> actions = jpaService.execute(actionGetCmd);
assertEquals(actions.size(), 1);
assertEquals(actions.get(0).getId(), actionId);
@@ -78,11 +80,40 @@ public class TestCoordJobGetActionsSubse
private void _testGetActionsSubsetOrderBy(String jobId, int actionNum, int
start, int len) throws Exception {
JPAService jpaService = Services.get().get(JPAService.class);
assertNotNull(jpaService);
- CoordJobGetActionsSubsetJPAExecutor actionGetCmd = new
CoordJobGetActionsSubsetJPAExecutor(jobId, start, len);
+ CoordJobGetActionsSubsetJPAExecutor actionGetCmd = new
CoordJobGetActionsSubsetJPAExecutor(jobId, Collections.<String>emptyList(),
start, len);
List<CoordinatorActionBean> actions = jpaService.execute(actionGetCmd);
assertEquals(actions.size(), 2);
// As actions are sorted by nominal time, the first action should be
with action number 2
assertEquals(actions.get(0).getActionNumber(), actionNum);
}
+ // Check status filters for Coordinator actions
+ public void testCoordActionFilter() throws Exception{
+ CoordinatorJobBean job =
addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, false, false);
+ // Add Coordinator action with nominal time: 2009-12-15T01:00Z
+ addRecordToCoordActionTable(job.getId(), 1,
CoordinatorAction.Status.RUNNING,
+ "coord-action-get.xml", 0);
+ // Add Coordinator action with nominal time: 2009-02-01T23:59Z
+ addRecordToCoordActionTable(job.getId(), 2,
CoordinatorAction.Status.WAITING,
+ "coord-action-get.xml", 0);
+ // Create lists for status filter
+ List<String> filterList = new ArrayList<String>();
+ filterList.add("RUNNING");
+ filterList.add("KILLED");
+ _testGetActionsSubsetFilter(job.getId(), 1, filterList, 1, 2);
+ }
+
+ // Check whether actions are retrieved based on the filter values for
status
+ private void _testGetActionsSubsetFilter(String jobId, int actionNum,
List<String> filterList, int start, int len)
+ throws JPAExecutorException {
+ JPAService jpaService = Services.get().get(JPAService.class);
+ assertNotNull(jpaService);
+ CoordJobGetActionsSubsetJPAExecutor actionGetCmd = new
CoordJobGetActionsSubsetJPAExecutor(jobId, filterList,
+ start, len);
+ List<CoordinatorActionBean> actions = jpaService.execute(actionGetCmd);
+ // As actions are filtered by RUNNING status, only 1 action should be
returned
+ assertEquals(actions.size(), 1);
+ assertEquals(actions.get(0).getActionNumber(), 1);
+ }
+
}
Modified:
incubator/oozie/trunk/core/src/test/java/org/apache/oozie/servlet/MockCoordinatorEngineService.java
URL:
http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/test/java/org/apache/oozie/servlet/MockCoordinatorEngineService.java?rev=1294403&r1=1294402&r2=1294403&view=diff
==============================================================================
---
incubator/oozie/trunk/core/src/test/java/org/apache/oozie/servlet/MockCoordinatorEngineService.java
(original)
+++
incubator/oozie/trunk/core/src/test/java/org/apache/oozie/servlet/MockCoordinatorEngineService.java
Mon Feb 27 23:55:30 2012
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -161,7 +161,7 @@ public class MockCoordinatorEngineServic
}
@Override
- public CoordinatorJobBean getCoordJob(String jobId, int start, int
length) throws BaseEngineException {
+ public CoordinatorJobBean getCoordJob(String jobId, String filter, int
start, int length) throws BaseEngineException {
did = RestConstants.JOB_SHOW_INFO;
int idx = validateCoordinatorIdx(jobId);
return (CoordinatorJobBean) coordJobs.get(idx);
Modified: incubator/oozie/trunk/docs/src/site/twiki/DG_CommandLineTool.twiki
URL:
http://svn.apache.org/viewvc/incubator/oozie/trunk/docs/src/site/twiki/DG_CommandLineTool.twiki?rev=1294403&r1=1294402&r2=1294403&view=diff
==============================================================================
--- incubator/oozie/trunk/docs/src/site/twiki/DG_CommandLineTool.twiki
(original)
+++ incubator/oozie/trunk/docs/src/site/twiki/DG_CommandLineTool.twiki Mon Feb
27 23:55:30 2012
@@ -368,6 +368,11 @@ The =localtime= option displays times in
The =verbose= option gives more detailed information for all the actions, if
checking for workflow job or coordinator job.
+The =filter= option can be used to filter coordinator actions based on their
status.
+The filter option syntax is: <code>[status=VALUE][;status=VALUE]*</code>.
+Multiple values must be specified as different name value pairs. When multiple
filters are specified, all Coordinator actions that satisfy any one of the
filters will be retrieved. (The query will do an OR among all the filter values
for the status)
+Currently, the filter option can be used only with an =info= option on
Coordinator job.
+
---+++ Checking the xml definition of a Workflow, Coordinator or Bundle Job
Example:
Modified: incubator/oozie/trunk/release-log.txt
URL:
http://svn.apache.org/viewvc/incubator/oozie/trunk/release-log.txt?rev=1294403&r1=1294402&r2=1294403&view=diff
==============================================================================
--- incubator/oozie/trunk/release-log.txt (original)
+++ incubator/oozie/trunk/release-log.txt Mon Feb 27 23:55:30 2012
@@ -1,5 +1,6 @@
-- Oozie 3.2.0 release
+Oozie-677 Add Filter API for status on coordinator actions (Virag Kothari via
Angelo)
OOZIE-721 remove obsolete/not-used JobServlet and JobsServlet code (tucu)
OOZIE-626 Roll the oozie log file in GZ format (Kiran Nagasubramanian via
Angelo)
OOZIE-693 Fork-join validation doesn't check for the 'error to' transition of
nodes (Virag Kothari via Angelo)