Repository: oozie Updated Branches: refs/heads/master f75ad795e -> cb52b495f
OOZIE-3173 Coordinator job with frequency using cron syntax creates only one action in catchup mode (andras.piros) Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/cb52b495 Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/cb52b495 Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/cb52b495 Branch: refs/heads/master Commit: cb52b495f87b599e49eb10a542defea4ddb3173f Parents: f75ad79 Author: Andras Piros <andras.pi...@cloudera.com> Authored: Fri Feb 9 14:01:53 2018 -0300 Committer: Andras Piros <andras.pi...@cloudera.com> Committed: Fri Feb 9 14:01:53 2018 -0300 ---------------------------------------------------------------------- .../CoordMaterializeTransitionXCommand.java | 8 +- .../TestCoordMaterializeTransitionXCommand.java | 107 +++++++++++++++++++ release-log.txt | 1 + 3 files changed, 113 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/cb52b495/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java b/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java index 91d3508..2b91253 100644 --- a/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java +++ b/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java @@ -246,12 +246,14 @@ public class CoordMaterializeTransitionXCommand extends MaterializeTransitionXCo coordJob.getExecutionOrder().equals(CoordinatorJob.Execution.NONE)) { return new Date(); } - int frequency = 0; + final int frequency; try { frequency = Integer.parseInt(coordJob.getFrequency()); } - catch (NumberFormatException e) { - return currentMatTime; + catch (final NumberFormatException e) { + // Cron based frequency: catching up at maximum till the coordinator job's end time, + // bounded also by the throttle parameter, aka the number of coordinator actions to materialize + return coordJob.getEndTime(); } TimeZone appTz = DateUtils.getTimeZone(coordJob.getTimeZone()); http://git-wip-us.apache.org/repos/asf/oozie/blob/cb52b495/core/src/test/java/org/apache/oozie/command/coord/TestCoordMaterializeTransitionXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/command/coord/TestCoordMaterializeTransitionXCommand.java b/core/src/test/java/org/apache/oozie/command/coord/TestCoordMaterializeTransitionXCommand.java index 2a648d7..61bbbfe 100644 --- a/core/src/test/java/org/apache/oozie/command/coord/TestCoordMaterializeTransitionXCommand.java +++ b/core/src/test/java/org/apache/oozie/command/coord/TestCoordMaterializeTransitionXCommand.java @@ -328,6 +328,113 @@ public class TestCoordMaterializeTransitionXCommand extends XDataTestCase { fail("Job ID " + job.getId() + " was not stored properly in db"); } } + + public void testCronFrequencyCatchupThrottleLessThanDuration() throws Exception { + final String startInThePast = "2013-03-10T08:00Z"; + final String startPlusOneDay = "2013-03-11T08:00Z"; + final Date startTime = DateUtils.parseDateOozieTZ(startInThePast); + final Date endTime = DateUtils.parseDateOozieTZ(startPlusOneDay); + CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.PREP, startTime, endTime, false, false, 0); + job.setNextMaterializedTime(startTime); + job.setMatThrottling(3); + final String everyHour = "0 * * * *"; + job.setFrequency(everyHour); + job.setTimeUnit(Timeunit.CRON); + CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, job); + + new CoordMaterializeTransitionXCommand(job.getId(), 3600).call(); + final String startPlusOneHour = "2013-03-10T09:00Z"; + final String startPlusTwoHours = "2013-03-10T10:00Z"; + final Date[] nominalTimes = new Date[] {DateUtils.parseDateOozieTZ(startInThePast), + DateUtils.parseDateOozieTZ(startPlusOneHour), + DateUtils.parseDateOozieTZ(startPlusTwoHours)}; + checkCoordActionsNominalTime(job.getId(), 3, nominalTimes); + + try { + final JPAService jpaService = Services.get().get(JPAService.class); + job = jpaService.execute(new CoordJobGetJPAExecutor(job.getId())); + assertFalse("coordinator job shouldn't have yet been materialized", job.isDoneMaterialization()); + assertEquals("coordinator action count mismatch", 3, job.getLastActionNumber()); + final String startPlusThreeHours = "2013-03-10T11:00Z"; + assertEquals("coordinator next materialization time mismatch", + DateUtils.parseDateOozieTZ(startPlusThreeHours), job.getNextMaterializedTime()); + } + catch (final JPAExecutorException se) { + se.printStackTrace(); + fail("Job ID " + job.getId() + " was not stored properly in db"); + } + } + + public void testCronFrequencyCatchupThrottleEqualsDuration() throws Exception { + final String startInThePast = "2013-03-10T08:00Z"; + final Date startTime = DateUtils.parseDateOozieTZ(startInThePast); + final String startPlusTwoHoursAndSome = "2013-03-10T10:01Z"; + final Date endTime = DateUtils.parseDateOozieTZ(startPlusTwoHoursAndSome); + CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.PREP, startTime, endTime, false, false, 0); + job.setNextMaterializedTime(startTime); + job.setMatThrottling(3); + final String everyHour = "0 * * * *"; + job.setFrequency(everyHour); + job.setTimeUnit(Timeunit.CRON); + CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, job); + + new CoordMaterializeTransitionXCommand(job.getId(), 3600).call(); + final String startPlusOneHour = "2013-03-10T09:00Z"; + final String startPlusTwoHours = "2013-03-10T10:00Z"; + final Date[] nominalTimes = new Date[] {DateUtils.parseDateOozieTZ(startInThePast), + DateUtils.parseDateOozieTZ(startPlusOneHour), + DateUtils.parseDateOozieTZ(startPlusTwoHours)}; + checkCoordActionsNominalTime(job.getId(), 3, nominalTimes); + + try { + final JPAService jpaService = Services.get().get(JPAService.class); + job = jpaService.execute(new CoordJobGetJPAExecutor(job.getId())); + assertTrue("coordinator job should have already been materialized", job.isDoneMaterialization()); + assertEquals("coordinator action count mismatch", 3, job.getLastActionNumber()); + final String startPlusThreeHours = "2013-03-10T11:00Z"; + assertEquals("coordinator next materialization time mismatch", + DateUtils.parseDateOozieTZ(startPlusThreeHours), job.getNextMaterializedTime()); + } + catch (final JPAExecutorException se) { + se.printStackTrace(); + fail("Job ID " + job.getId() + " was not stored properly in db"); + } + } + + public void testCronFrequencyCatchupThrottleMoreThanDuration() throws Exception { + final String startInThePast = "2013-03-10T08:00Z"; + final Date startTime = DateUtils.parseDateOozieTZ(startInThePast); + final String startPlusOneHourAndSome = "2013-03-10T09:01Z"; + final Date endTime = DateUtils.parseDateOozieTZ(startPlusOneHourAndSome); + CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.PREP, startTime, endTime, false, false, 0); + job.setNextMaterializedTime(startTime); + job.setMatThrottling(5); + final String everyHour = "0 * * * *"; + job.setFrequency(everyHour); + job.setTimeUnit(Timeunit.CRON); + CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, job); + + new CoordMaterializeTransitionXCommand(job.getId(), 3600).call(); + final String startPlusOneHour = "2013-03-10T09:00Z"; + final Date[] nominalTimes = new Date[] {DateUtils.parseDateOozieTZ(startInThePast), + DateUtils.parseDateOozieTZ(startPlusOneHour)}; + checkCoordActionsNominalTime(job.getId(), 2, nominalTimes); + + try { + final JPAService jpaService = Services.get().get(JPAService.class); + job = jpaService.execute(new CoordJobGetJPAExecutor(job.getId())); + assertTrue("coordinator job should have already been materialized", job.isDoneMaterialization()); + assertEquals("coordinator action count mismatch", 2, job.getLastActionNumber()); + final String startPlusTwoHours = "2013-03-10T10:00Z"; + assertEquals("coordinator next materialization time mismatch", + DateUtils.parseDateOozieTZ(startPlusTwoHours), job.getNextMaterializedTime()); + } + catch (final JPAExecutorException se) { + se.printStackTrace(); + fail("Job ID " + job.getId() + " was not stored properly in db"); + } + } + public void testActionMaterWithDST1() throws Exception { Date startTime = DateUtils.parseDateOozieTZ("2013-03-10T08:00Z"); Date endTime = DateUtils.parseDateOozieTZ("2013-03-10T12:00Z"); http://git-wip-us.apache.org/repos/asf/oozie/blob/cb52b495/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index 05b9ee1..d7a6ed1 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,5 +1,6 @@ -- Oozie 5.0.0 release (trunk - unreleased) +OOZIE-3173 Coordinator job with frequency using cron syntax creates only one action in catchup mode (andras.piros) OOZIE-3121 bump all maven plugins to latest versions (dbist13 via gezapeti) OOZIE-3163 Improve documentation rendering: use fluido skin and better config (hboutemy via andras.piros) OOZIE-2847 Oozie Ha timing issue (dionusos via asasvari)