Author: kamrul
Date: Sun Dec 11 01:50:48 2011
New Revision: 1212958
URL: http://svn.apache.org/viewvc?rev=1212958&view=rev
Log:
OOZIE-621 Option to add timeunit for frequency for coordinator jobs
filtering.(Kiran via Mohammad)
Modified:
incubator/oozie/trunk/client/src/main/java/org/apache/oozie/client/OozieClient.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/CoordinatorEngine.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/store/StoreStatusFilter.java
incubator/oozie/trunk/core/src/test/java/org/apache/oozie/TestCoordinatorEngine.java
incubator/oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobInfoGetJPAExecutor.java
incubator/oozie/trunk/release-log.txt
Modified:
incubator/oozie/trunk/client/src/main/java/org/apache/oozie/client/OozieClient.java
URL:
http://svn.apache.org/viewvc/incubator/oozie/trunk/client/src/main/java/org/apache/oozie/client/OozieClient.java?rev=1212958&r1=1212957&r2=1212958&view=diff
==============================================================================
---
incubator/oozie/trunk/client/src/main/java/org/apache/oozie/client/OozieClient.java
(original)
+++
incubator/oozie/trunk/client/src/main/java/org/apache/oozie/client/OozieClient.java
Sun Dec 11 01:50:48 2011
@@ -120,6 +120,8 @@ public class OozieClient {
public static final String FILTER_FREQUENCY = "frequency";
public static final String FILTER_ID = "id";
+
+ public static final String FILTER_UNIT = "unit";
public static final String CHANGE_VALUE_ENDTIME = "endtime";
Modified:
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/CoordinatorEngine.java
URL:
http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/CoordinatorEngine.java?rev=1212958&r1=1212957&r2=1212958&view=diff
==============================================================================
---
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/CoordinatorEngine.java
(original)
+++
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/CoordinatorEngine.java
Sun Dec 11 01:50:48 2011
@@ -505,6 +505,7 @@ public class CoordinatorEngine extends B
FILTER_NAMES.add(OozieClient.FILTER_STATUS);
FILTER_NAMES.add(OozieClient.FILTER_ID);
FILTER_NAMES.add(OozieClient.FILTER_FREQUENCY);
+ FILTER_NAMES.add(OozieClient.FILTER_UNIT);
}
/**
@@ -537,6 +538,10 @@ public class CoordinatorEngine extends B
*/
private Map<String, List<String>> parseFilter(String filter) throws
CoordinatorEngineException {
Map<String, List<String>> map = new HashMap<String, List<String>>();
+ boolean isTimeUnitSpecified = false;
+ String timeUnit = "MINUTE";
+ boolean isFrequencySpecified = false;
+ String frequency = "";
if (filter != null) {
StringTokenizer st = new StringTokenizer(filter, ";");
while (st.hasMoreTokens()) {
@@ -547,10 +552,32 @@ public class CoordinatorEngine extends B
throw new CoordinatorEngineException(ErrorCode.E0420,
filter,
"elements must be name=value pairs");
}
- if (!FILTER_NAMES.contains(pair[0])) {
+ if (!FILTER_NAMES.contains(pair[0].toLowerCase())) {
throw new CoordinatorEngineException(ErrorCode.E0420,
filter, XLog.format("invalid name [{0}]",
pair[0]));
}
+ if (pair[0].equalsIgnoreCase("frequency")) {
+ isFrequencySpecified = true;
+ try {
+ frequency = (int) Float.parseFloat(pair[1]) + "";
+ continue;
+ }
+ catch (NumberFormatException NANException) {
+ throw new
CoordinatorEngineException(ErrorCode.E0420, filter, XLog.format(
+ "invalid value [{0}] for frequency. A
numerical value is expected", pair[1]));
+ }
+ }
+ if (pair[0].equalsIgnoreCase("unit")) {
+ isTimeUnitSpecified = true;
+ timeUnit = pair[1];
+ if (!timeUnit.equalsIgnoreCase("months") &&
!timeUnit.equalsIgnoreCase("days")
+ && !timeUnit.equalsIgnoreCase("hours") &&
!timeUnit.equalsIgnoreCase("minutes")) {
+ throw new
CoordinatorEngineException(ErrorCode.E0420, filter, XLog.format(
+ "invalid value [{0}] for time unit. "
+ + "Valid value is one of months,
days, hours or minutes", pair[1]));
+ }
+ continue;
+ }
if (pair[0].equals("status")) {
try {
CoordinatorJob.Status.valueOf(pair[1]);
@@ -566,11 +593,38 @@ public class CoordinatorEngine extends B
map.put(pair[0], list);
}
list.add(pair[1]);
- }
- else {
+ } else {
throw new CoordinatorEngineException(ErrorCode.E0420,
filter, "elements must be name=value pairs");
}
}
+ // Unit is specified and frequency is not specified
+ if (!isFrequencySpecified && isTimeUnitSpecified) {
+ throw new CoordinatorEngineException(ErrorCode.E0420, filter,
"time unit should be added only when "
+ + "frequency is specified. Either specify frequency
also or else remove the time unit");
+ } else if (isFrequencySpecified) {
+ // Frequency value is specified
+ if (isTimeUnitSpecified) {
+ if (timeUnit.equalsIgnoreCase("months")) {
+ timeUnit = "MONTH";
+ } else if (timeUnit.equalsIgnoreCase("days")) {
+ timeUnit = "DAY";
+ } else if (timeUnit.equalsIgnoreCase("hours")) {
+ // When job details are persisted to database,
frequency in hours are converted to minutes.
+ // This conversion is to conform with that.
+ frequency = Integer.parseInt(frequency) * 60 + "";
+ timeUnit = "MINUTE";
+ } else if (timeUnit.equalsIgnoreCase("minutes")) {
+ timeUnit = "MINUTE";
+ }
+ }
+ // Adding the frequency and time unit filters to the filter map
+ List<String> list = new ArrayList<String>();
+ list.add(timeUnit);
+ map.put("unit", list);
+ list = new ArrayList<String>();
+ list.add(frequency);
+ map.put("frequency", list);
+ }
}
return map;
}
Modified:
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/store/StoreStatusFilter.java
URL:
http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/store/StoreStatusFilter.java?rev=1212958&r1=1212957&r2=1212958&view=diff
==============================================================================
---
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/store/StoreStatusFilter.java
(original)
+++
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/store/StoreStatusFilter.java
Sun Dec 11 01:50:48 2011
@@ -44,6 +44,7 @@ public class StoreStatusFilter {
boolean isEnabled = false;
boolean isFrequency = false;
boolean isId = false;
+ boolean isUnit = false;
int index = 0;
@@ -239,6 +240,40 @@ public class StoreStatusFilter {
colArray.add(colVar);
}
}
+ // Filter map has time unit filter specified
+ else if
(entry.getKey().equals(OozieClient.FILTER_UNIT)) {
+ List<String> values =
filter.get(OozieClient.FILTER_UNIT);
+ colName = "timeUnitStr";
+ for (int i = 0; i < values.size(); ++i) {
+ colVar = colName + index;
+ // This unit filter value is the first
condition to be added to the where clause of
+ // query
+ if (!isEnabled && !isUnit) {
+ sb.append(seletStr).append(" where
w.timeUnitStr IN (:timeUnitStr" + index);
+ isUnit = true;
+ isEnabled = true;
+ } else {
+ // Unit filter is neither the first nor
the last condition to be added to the where
+ // clause of query
+ if (isEnabled && !isUnit) {
+ sb.append(" and w.timeUnitStr IN
(:timeUnitStr" + index);
+ isUnit = true;
+ } else {
+ if (isUnit) {
+ sb.append(", :timeUnitStr" +
index);
+ }
+ }
+ }
+ // This unit filter value is the last
condition to be added to the where clause of query
+ if (i == values.size() - 1) {
+ sb.append(")");
+ }
+ ++index;
+ valArray.add(values.get(i));
+ orArray.add(colName);
+ colArray.add(colVar);
+ }
+ }
}
}
}
Modified:
incubator/oozie/trunk/core/src/test/java/org/apache/oozie/TestCoordinatorEngine.java
URL:
http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/test/java/org/apache/oozie/TestCoordinatorEngine.java?rev=1212958&r1=1212957&r2=1212958&view=diff
==============================================================================
---
incubator/oozie/trunk/core/src/test/java/org/apache/oozie/TestCoordinatorEngine.java
(original)
+++
incubator/oozie/trunk/core/src/test/java/org/apache/oozie/TestCoordinatorEngine.java
Sun Dec 11 01:50:48 2011
@@ -167,7 +167,6 @@ public class TestCoordinatorEngine exten
}
});
-
List<CoordinatorAction> actions = ce.getCoordJob(jobId).getActions();
assertTrue(actions.size() > 0);
CoordinatorAction action = actions.get(0);
@@ -176,7 +175,6 @@ public class TestCoordinatorEngine exten
assertEquals("file://" + getTestCaseDir() +
"/workflows/2009/02/01/consume_me", missingDeps);
}
-
/**
* Test Missing Dependencies with Empty Done Flag in Schema
*
@@ -224,7 +222,6 @@ public class TestCoordinatorEngine exten
}
});
-
List<CoordinatorAction> actions = ce.getCoordJob(jobId).getActions();
assertTrue(actions.size() > 0);
CoordinatorAction action = actions.get(0);
@@ -233,7 +230,6 @@ public class TestCoordinatorEngine exten
assertEquals("file://" + getTestCaseDir() + "/workflows/2009/02/01",
missingDeps);
}
-
/**
* Test Missing Dependencies with Done Flag in Schema
*
@@ -329,7 +325,7 @@ public class TestCoordinatorEngine exten
conf.set(OozieClient.USER_NAME, getTestUser());
conf.set(OozieClient.GROUP_NAME, "other");
injectKerberosInfo(conf);
-
+
final CoordinatorEngine ce = new CoordinatorEngine(getTestUser(),
"UNIT_TESTING");
final String jobId = ce.submitJob(conf, true);
waitFor(5000, new Predicate() {
@@ -355,14 +351,46 @@ public class TestCoordinatorEngine exten
assertEquals(job.getAppPath(), appPath);
}
+ /**
+ * Test to validate frequency and time unit filters for jobs
+ *
+ * @throws Exception
+ */
public void _testGetJobs(String jobId) throws Exception {
CoordinatorEngine ce = new CoordinatorEngine(getTestUser(),
"UNIT_TESTING");
- CoordinatorJobInfo jobInfo = ce.getCoordJobs("", 1, 10); // TODO: use
- // valid
- // filter
+ // Test with no job filter specified
+ CoordinatorJobInfo jobInfo = ce.getCoordJobs("", 1, 10);
assertEquals(1, jobInfo.getCoordJobs().size());
CoordinatorJob job = jobInfo.getCoordJobs().get(0);
assertEquals(jobId, job.getId());
+
+ // Test specifying the value for unit but leaving out the value for
frequency
+ try {
+ jobInfo = ce.getCoordJobs("unit=minutes", 1, 10);
+ }
+ catch (CoordinatorEngineException ex) {
+ assertEquals("E0420: Invalid jobs filter [unit=minutes], time unit
should be added only when "
+ + "frequency is specified. Either specify frequency also
or else remove the time unit", ex
+ .getMessage());
+ }
+
+ // Test for invalid frequency value(Non-numeric value)
+ try {
+ jobInfo = ce.getCoordJobs("frequency=ghj;unit=minutes", 1, 10);
+ }
+ catch (CoordinatorEngineException ex) {
+ assertEquals("E0420: Invalid jobs filter
[frequency=ghj;unit=minutes], "
+ + "invalid value [ghj] for frequency. A numerical value is
expected", ex.getMessage());
+ }
+
+ // Test for invalid unit value(Other than months, days, minutes or
hours)
+ try {
+ jobInfo = ce.getCoordJobs("frequency=60;unit=min", 1, 10);
+ }
+ catch (CoordinatorEngineException ex) {
+ assertEquals("E0420: Invalid jobs filter [frequency=60;unit=min],
invalid value [min] for time unit. "
+ + "Valid value is one of months, days, hours or minutes",
ex.getMessage());
+ }
}
private void _testGetDefinition(String jobId) throws Exception {
Modified:
incubator/oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobInfoGetJPAExecutor.java
URL:
http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobInfoGetJPAExecutor.java?rev=1212958&r1=1212957&r2=1212958&view=diff
==============================================================================
---
incubator/oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobInfoGetJPAExecutor.java
(original)
+++
incubator/oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobInfoGetJPAExecutor.java
Sun Dec 11 01:50:48 2011
@@ -61,6 +61,7 @@ public class TestCoordJobInfoGetJPAExecu
_testGetJobInfoForUserAndStatus();
_testGetJobInfoForFrequency();
_testGetJobInfoForId(coordinatorJob1.getId());
+ _testGetJobInfoForFrequencyAndUnit();
}
private void _testGetJobInfoForStatus() throws Exception {
@@ -165,4 +166,51 @@ public class TestCoordJobInfoGetJPAExecu
assertEquals(ret.getCoordJobs().size(), 1);
}
+ /**
+ * Test to verify various combinations of frequency and time unit filters
for jobs
+ *
+ * @throws Exception
+ */
+ private void _testGetJobInfoForFrequencyAndUnit() throws Exception {
+ JPAService jpaService = Services.get().get(JPAService.class);
+ assertNotNull(jpaService);
+
+ // Test specifying frequency value as 1 minute
+ Map<String, List<String>> filter = new HashMap<String, List<String>>();
+ List<String> unitList = new ArrayList<String>();
+ List<String> frequencyList = new ArrayList<String>();
+ unitList.add("MINUTE");
+ filter.put(OozieClient.FILTER_UNIT, unitList);
+ frequencyList = new ArrayList<String>();
+ frequencyList.add("1");
+ filter.put(OozieClient.FILTER_FREQUENCY, frequencyList);
+ CoordJobInfoGetJPAExecutor coordInfoGetCmd = new
CoordJobInfoGetJPAExecutor(filter, 1, 20);
+ CoordinatorJobInfo ret = jpaService.execute(coordInfoGetCmd);
+ assertNotNull(ret);
+ assertEquals(ret.getCoordJobs().size(), 0);
+ frequencyList.remove(0);
+ unitList.remove(0);
+
+ // Test specifying frequency value as 3 days
+ unitList.add("DAY");
+ filter.put(OozieClient.FILTER_UNIT, unitList);
+ frequencyList.add("3");
+ filter.put(OozieClient.FILTER_FREQUENCY, frequencyList);
+ coordInfoGetCmd = new CoordJobInfoGetJPAExecutor(filter, 1, 20);
+ ret = jpaService.execute(coordInfoGetCmd);
+ assertNotNull(ret);
+ assertEquals(ret.getCoordJobs().size(), 0);
+ frequencyList.remove(0);
+ unitList.remove(0);
+
+ // Test specifying frequency value as 1 day
+ unitList.add("DAY");
+ filter.put(OozieClient.FILTER_UNIT, unitList);
+ frequencyList.add("1");
+ filter.put(OozieClient.FILTER_FREQUENCY, frequencyList);
+ coordInfoGetCmd = new CoordJobInfoGetJPAExecutor(filter, 1, 20);
+ ret = jpaService.execute(coordInfoGetCmd);
+ assertNotNull(ret);
+ assertEquals(ret.getCoordJobs().size(), 3);
+ }
}
Modified: incubator/oozie/trunk/release-log.txt
URL:
http://svn.apache.org/viewvc/incubator/oozie/trunk/release-log.txt?rev=1212958&r1=1212957&r2=1212958&view=diff
==============================================================================
--- incubator/oozie/trunk/release-log.txt (original)
+++ incubator/oozie/trunk/release-log.txt Sun Dec 11 01:50:48 2011
@@ -1,5 +1,6 @@
-- Oozie 3.2.0 release
+OOZIE-621 Option to add timeunit for frequency for coordinator jobs
filtering.(Kiran via Mohammad)
OOZIE-627 Handle GZ log retrieval failures gracefully.(Kiran via Mohammad)
OOZIE-8 Variable not getting replaced with value in workflow rerun.(Mona via
Mohammad)
OOZIE-15 Coordinator input/output event instance should limit 1 instance.
(Mona via Mohammad)