Repository: oozie
Updated Branches:
  refs/heads/master f75ad795e -> cb52b495f


OOZIE-3173 Coordinator job with frequency using cron syntax creates only one 
action in catchup mode (andras.piros)


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/cb52b495
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/cb52b495
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/cb52b495

Branch: refs/heads/master
Commit: cb52b495f87b599e49eb10a542defea4ddb3173f
Parents: f75ad79
Author: Andras Piros <andras.pi...@cloudera.com>
Authored: Fri Feb 9 14:01:53 2018 -0300
Committer: Andras Piros <andras.pi...@cloudera.com>
Committed: Fri Feb 9 14:01:53 2018 -0300

----------------------------------------------------------------------
 .../CoordMaterializeTransitionXCommand.java     |   8 +-
 .../TestCoordMaterializeTransitionXCommand.java | 107 +++++++++++++++++++
 release-log.txt                                 |   1 +
 3 files changed, 113 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/cb52b495/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
 
b/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
index 91d3508..2b91253 100644
--- 
a/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
+++ 
b/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
@@ -246,12 +246,14 @@ public class CoordMaterializeTransitionXCommand extends 
MaterializeTransitionXCo
                 
coordJob.getExecutionOrder().equals(CoordinatorJob.Execution.NONE)) {
             return new Date();
         }
-        int frequency = 0;
+        final int frequency;
         try {
             frequency = Integer.parseInt(coordJob.getFrequency());
         }
-        catch (NumberFormatException e) {
-            return currentMatTime;
+        catch (final NumberFormatException e) {
+            // Cron based frequency: catching up at maximum till the 
coordinator job's end time,
+            // bounded also by the throttle parameter, aka the number of 
coordinator actions to materialize
+            return coordJob.getEndTime();
         }
 
         TimeZone appTz = DateUtils.getTimeZone(coordJob.getTimeZone());

http://git-wip-us.apache.org/repos/asf/oozie/blob/cb52b495/core/src/test/java/org/apache/oozie/command/coord/TestCoordMaterializeTransitionXCommand.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/oozie/command/coord/TestCoordMaterializeTransitionXCommand.java
 
b/core/src/test/java/org/apache/oozie/command/coord/TestCoordMaterializeTransitionXCommand.java
index 2a648d7..61bbbfe 100644
--- 
a/core/src/test/java/org/apache/oozie/command/coord/TestCoordMaterializeTransitionXCommand.java
+++ 
b/core/src/test/java/org/apache/oozie/command/coord/TestCoordMaterializeTransitionXCommand.java
@@ -328,6 +328,113 @@ public class TestCoordMaterializeTransitionXCommand 
extends XDataTestCase {
             fail("Job ID " + job.getId() + " was not stored properly in db");
         }
     }
+
+    public void testCronFrequencyCatchupThrottleLessThanDuration() throws 
Exception {
+        final String startInThePast = "2013-03-10T08:00Z";
+        final String startPlusOneDay = "2013-03-11T08:00Z";
+        final Date startTime = DateUtils.parseDateOozieTZ(startInThePast);
+        final Date endTime = DateUtils.parseDateOozieTZ(startPlusOneDay);
+        CoordinatorJobBean job = 
addRecordToCoordJobTable(CoordinatorJob.Status.PREP, startTime, endTime, false, 
false, 0);
+        job.setNextMaterializedTime(startTime);
+        job.setMatThrottling(3);
+        final String everyHour = "0 * * * *";
+        job.setFrequency(everyHour);
+        job.setTimeUnit(Timeunit.CRON);
+        
CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB,
 job);
+
+        new CoordMaterializeTransitionXCommand(job.getId(), 3600).call();
+        final String startPlusOneHour = "2013-03-10T09:00Z";
+        final String startPlusTwoHours = "2013-03-10T10:00Z";
+        final Date[] nominalTimes = new Date[] 
{DateUtils.parseDateOozieTZ(startInThePast),
+                DateUtils.parseDateOozieTZ(startPlusOneHour),
+                DateUtils.parseDateOozieTZ(startPlusTwoHours)};
+        checkCoordActionsNominalTime(job.getId(), 3, nominalTimes);
+
+        try {
+            final JPAService jpaService = Services.get().get(JPAService.class);
+            job =  jpaService.execute(new CoordJobGetJPAExecutor(job.getId()));
+            assertFalse("coordinator job shouldn't have yet been 
materialized", job.isDoneMaterialization());
+            assertEquals("coordinator action count mismatch", 3, 
job.getLastActionNumber());
+            final String startPlusThreeHours = "2013-03-10T11:00Z";
+            assertEquals("coordinator next materialization time mismatch",
+                    DateUtils.parseDateOozieTZ(startPlusThreeHours), 
job.getNextMaterializedTime());
+        }
+        catch (final JPAExecutorException se) {
+            se.printStackTrace();
+            fail("Job ID " + job.getId() + " was not stored properly in db");
+        }
+    }
+
+    public void testCronFrequencyCatchupThrottleEqualsDuration() throws 
Exception {
+        final String startInThePast = "2013-03-10T08:00Z";
+        final Date startTime = DateUtils.parseDateOozieTZ(startInThePast);
+        final String startPlusTwoHoursAndSome = "2013-03-10T10:01Z";
+        final Date endTime = 
DateUtils.parseDateOozieTZ(startPlusTwoHoursAndSome);
+        CoordinatorJobBean job = 
addRecordToCoordJobTable(CoordinatorJob.Status.PREP, startTime, endTime, false, 
false, 0);
+        job.setNextMaterializedTime(startTime);
+        job.setMatThrottling(3);
+        final String everyHour = "0 * * * *";
+        job.setFrequency(everyHour);
+        job.setTimeUnit(Timeunit.CRON);
+        
CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB,
 job);
+
+        new CoordMaterializeTransitionXCommand(job.getId(), 3600).call();
+        final String startPlusOneHour = "2013-03-10T09:00Z";
+        final String startPlusTwoHours = "2013-03-10T10:00Z";
+        final Date[] nominalTimes = new Date[] 
{DateUtils.parseDateOozieTZ(startInThePast),
+                DateUtils.parseDateOozieTZ(startPlusOneHour),
+                DateUtils.parseDateOozieTZ(startPlusTwoHours)};
+        checkCoordActionsNominalTime(job.getId(), 3, nominalTimes);
+
+        try {
+            final JPAService jpaService = Services.get().get(JPAService.class);
+            job =  jpaService.execute(new CoordJobGetJPAExecutor(job.getId()));
+            assertTrue("coordinator job should have already been 
materialized", job.isDoneMaterialization());
+            assertEquals("coordinator action count mismatch", 3, 
job.getLastActionNumber());
+            final String startPlusThreeHours = "2013-03-10T11:00Z";
+            assertEquals("coordinator next materialization time mismatch",
+                    DateUtils.parseDateOozieTZ(startPlusThreeHours), 
job.getNextMaterializedTime());
+        }
+        catch (final JPAExecutorException se) {
+            se.printStackTrace();
+            fail("Job ID " + job.getId() + " was not stored properly in db");
+        }
+    }
+
+    public void testCronFrequencyCatchupThrottleMoreThanDuration() throws 
Exception {
+        final String startInThePast = "2013-03-10T08:00Z";
+        final Date startTime = DateUtils.parseDateOozieTZ(startInThePast);
+        final String startPlusOneHourAndSome = "2013-03-10T09:01Z";
+        final Date endTime = 
DateUtils.parseDateOozieTZ(startPlusOneHourAndSome);
+        CoordinatorJobBean job = 
addRecordToCoordJobTable(CoordinatorJob.Status.PREP, startTime, endTime, false, 
false, 0);
+        job.setNextMaterializedTime(startTime);
+        job.setMatThrottling(5);
+        final String everyHour = "0 * * * *";
+        job.setFrequency(everyHour);
+        job.setTimeUnit(Timeunit.CRON);
+        
CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB,
 job);
+
+        new CoordMaterializeTransitionXCommand(job.getId(), 3600).call();
+        final String startPlusOneHour = "2013-03-10T09:00Z";
+        final Date[] nominalTimes = new Date[] 
{DateUtils.parseDateOozieTZ(startInThePast),
+                DateUtils.parseDateOozieTZ(startPlusOneHour)};
+        checkCoordActionsNominalTime(job.getId(), 2, nominalTimes);
+
+        try {
+            final JPAService jpaService = Services.get().get(JPAService.class);
+            job =  jpaService.execute(new CoordJobGetJPAExecutor(job.getId()));
+            assertTrue("coordinator job should have already been 
materialized", job.isDoneMaterialization());
+            assertEquals("coordinator action count mismatch", 2, 
job.getLastActionNumber());
+            final String startPlusTwoHours = "2013-03-10T10:00Z";
+            assertEquals("coordinator next materialization time mismatch",
+                    DateUtils.parseDateOozieTZ(startPlusTwoHours), 
job.getNextMaterializedTime());
+        }
+        catch (final JPAExecutorException se) {
+            se.printStackTrace();
+            fail("Job ID " + job.getId() + " was not stored properly in db");
+        }
+    }
+
     public void testActionMaterWithDST1() throws Exception {
         Date startTime = DateUtils.parseDateOozieTZ("2013-03-10T08:00Z");
         Date endTime = DateUtils.parseDateOozieTZ("2013-03-10T12:00Z");

http://git-wip-us.apache.org/repos/asf/oozie/blob/cb52b495/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 05b9ee1..d7a6ed1 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 5.0.0 release (trunk - unreleased)
 
+OOZIE-3173 Coordinator job with frequency using cron syntax creates only one 
action in catchup mode (andras.piros)
 OOZIE-3121 bump all maven plugins to latest versions (dbist13 via gezapeti)
 OOZIE-3163 Improve documentation rendering: use fluido skin and better config 
(hboutemy via andras.piros)
 OOZIE-2847 Oozie Ha timing issue (dionusos via asasvari)

Reply via email to