Repository: beam Updated Branches: refs/heads/master 9061c65e3 -> db19c7df5
Reify delay and alignment in AfterProcessingTime transform Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/40c4a5cb Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/40c4a5cb Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/40c4a5cb Branch: refs/heads/master Commit: 40c4a5cb6eaa0350a26fe1f215eb812541a7b105 Parents: bea101a Author: Kenneth Knowles <k...@google.com> Authored: Sun Feb 12 15:03:48 2017 -0800 Committer: Kenneth Knowles <k...@google.com> Committed: Tue Feb 14 14:55:48 2017 -0800 ---------------------------------------------------------------------- .../AfterDelayFromFirstElementStateMachine.java | 4 +- .../AfterProcessingTimeStateMachine.java | 2 + ...rSynchronizedProcessingTimeStateMachine.java | 1 + .../core/triggers/TriggerStateMachines.java | 36 ++- .../core/triggers/TriggerStateMachinesTest.java | 7 +- .../windowing/AfterDelayFromFirstElement.java | 240 ------------------- .../windowing/AfterProcessingTime.java | 105 ++++++-- .../AfterSynchronizedProcessingTime.java | 25 +- .../windowing/TimestampTransform.java | 64 +++++ .../windowing/AfterProcessingTimeTest.java | 2 +- 10 files changed, 192 insertions(+), 294 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/40c4a5cb/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java index b720644..4444c22 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java @@ -47,6 +47,8 @@ import org.joda.time.format.PeriodFormatter; * * <p>This class is for internal use only and may change at any time. */ +// This class should be inlined to subclasses and deleted, simplifying them too +// https://issues.apache.org/jira/browse/BEAM-1486 @Experimental(Experimental.Kind.TRIGGER) public abstract class AfterDelayFromFirstElementStateMachine extends OnceTriggerStateMachine { @@ -250,7 +252,7 @@ public abstract class AfterDelayFromFirstElementStateMachine extends OnceTrigger /** * A {@link SerializableFunction} to delay the timestamp at which this triggers fires. */ - private static final class DelayFn implements SerializableFunction<Instant, Instant> { + static final class DelayFn implements SerializableFunction<Instant, Instant> { private final Duration delay; public DelayFn(Duration delay) { http://git-wip-us.apache.org/repos/asf/beam/blob/40c4a5cb/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterProcessingTimeStateMachine.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterProcessingTimeStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterProcessingTimeStateMachine.java index 2490463..eaf5613 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterProcessingTimeStateMachine.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterProcessingTimeStateMachine.java @@ -34,6 +34,8 @@ import org.joda.time.Instant; * AfterDelayFromFirstElementStateMachine#plusDelayOf} or {@link * AfterDelayFromFirstElementStateMachine#alignedTo}. */ +// The superclass should be inlined here, its only real use +// https://issues.apache.org/jira/browse/BEAM-1486 @Experimental(Experimental.Kind.TRIGGER) public class AfterProcessingTimeStateMachine extends AfterDelayFromFirstElementStateMachine { http://git-wip-us.apache.org/repos/asf/beam/blob/40c4a5cb/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterSynchronizedProcessingTimeStateMachine.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterSynchronizedProcessingTimeStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterSynchronizedProcessingTimeStateMachine.java index 000f6e7..1319a13 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterSynchronizedProcessingTimeStateMachine.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterSynchronizedProcessingTimeStateMachine.java @@ -25,6 +25,7 @@ import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.util.TimeDomain; import org.joda.time.Instant; +// This should not really have the superclass https://issues.apache.org/jira/browse/BEAM-1486 class AfterSynchronizedProcessingTimeStateMachine extends AfterDelayFromFirstElementStateMachine { @Override http://git-wip-us.apache.org/repos/asf/beam/blob/40c4a5cb/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachines.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachines.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachines.java index 1be7981..f0e9d21 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachines.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachines.java @@ -18,14 +18,16 @@ package org.apache.beam.runners.core.triggers; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Function; +import com.google.common.collect.FluentIterable; import com.google.common.collect.Lists; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.util.List; +import javax.annotation.Nonnull; import org.apache.beam.runners.core.triggers.TriggerStateMachine.OnceTriggerStateMachine; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.windowing.AfterAll; -import org.apache.beam.sdk.transforms.windowing.AfterDelayFromFirstElement; import org.apache.beam.sdk.transforms.windowing.AfterEach; import org.apache.beam.sdk.transforms.windowing.AfterFirst; import org.apache.beam.sdk.transforms.windowing.AfterPane; @@ -36,6 +38,7 @@ import org.apache.beam.sdk.transforms.windowing.DefaultTrigger; import org.apache.beam.sdk.transforms.windowing.Never.NeverTrigger; import org.apache.beam.sdk.transforms.windowing.OrFinallyTrigger; import org.apache.beam.sdk.transforms.windowing.Repeatedly; +import org.apache.beam.sdk.transforms.windowing.TimestampTransform; import org.apache.beam.sdk.transforms.windowing.Trigger; import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger; import org.apache.beam.sdk.util.ReshuffleTrigger; @@ -171,18 +174,37 @@ public class TriggerStateMachines { } private OnceTriggerStateMachine evaluateSpecific(AfterProcessingTime v) { - return evaluateSpecific((AfterDelayFromFirstElement) v); - } - - private OnceTriggerStateMachine evaluateSpecific(final AfterDelayFromFirstElement v) { return new AfterDelayFromFirstElementStateMachineAdapter(v); } private static class AfterDelayFromFirstElementStateMachineAdapter extends AfterDelayFromFirstElementStateMachine { - public AfterDelayFromFirstElementStateMachineAdapter(AfterDelayFromFirstElement v) { - this(v.getTimeDomain(), v.getTimestampMappers()); + private static final Function<TimestampTransform, SerializableFunction<Instant, Instant>> + CONVERT_TIMESTAMP_TRANSFORM = + new Function<TimestampTransform, SerializableFunction<Instant, Instant>>() { + @Override + public SerializableFunction<Instant, Instant> apply( + @Nonnull TimestampTransform transform) { + if (transform instanceof TimestampTransform.Delay) { + return new DelayFn(((TimestampTransform.Delay) transform).getDelay()); + } else if (transform instanceof TimestampTransform.AlignTo) { + TimestampTransform.AlignTo alignTo = (TimestampTransform.AlignTo) transform; + return new AlignFn(alignTo.getPeriod(), alignTo.getOffset()); + } else { + throw new IllegalArgumentException( + String.format( + "Unknown %s: %s", TimestampTransform.class.getSimpleName(), transform)); + } + } + }; + + public AfterDelayFromFirstElementStateMachineAdapter(AfterProcessingTime v) { + this( + TimeDomain.PROCESSING_TIME, + FluentIterable.from(v.getTimestampTransforms()) + .transform(CONVERT_TIMESTAMP_TRANSFORM) + .toList()); } private AfterDelayFromFirstElementStateMachineAdapter( http://git-wip-us.apache.org/repos/asf/beam/blob/40c4a5cb/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachinesTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachinesTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachinesTest.java index 37f8f10..26c0597 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachinesTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachinesTest.java @@ -23,7 +23,6 @@ import static org.junit.Assert.assertThat; import org.apache.beam.runners.core.triggers.TriggerStateMachine.OnceTriggerStateMachine; import org.apache.beam.sdk.transforms.windowing.AfterAll; -import org.apache.beam.sdk.transforms.windowing.AfterDelayFromFirstElement; import org.apache.beam.sdk.transforms.windowing.AfterEach; import org.apache.beam.sdk.transforms.windowing.AfterFirst; import org.apache.beam.sdk.transforms.windowing.AfterPane; @@ -64,7 +63,7 @@ public class TriggerStateMachinesTest { Duration minutes = Duration.standardMinutes(94); Duration hours = Duration.standardHours(13); - AfterDelayFromFirstElement trigger = + AfterProcessingTime trigger = AfterProcessingTime.pastFirstElementInPane().plusDelayOf(minutes).alignedTo(hours); AfterDelayFromFirstElementStateMachine machine = @@ -72,10 +71,6 @@ public class TriggerStateMachinesTest { TriggerStateMachines.stateMachineForOnceTrigger(trigger); assertThat(machine.getTimeDomain(), equalTo(TimeDomain.PROCESSING_TIME)); - - // This equality is function equality, but due to the structure of the code (no serialization) - // it is OK to check - assertThat(machine.getTimestampMappers(), equalTo(trigger.getTimestampMappers())); } @Test http://git-wip-us.apache.org/repos/asf/beam/blob/40c4a5cb/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterDelayFromFirstElement.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterDelayFromFirstElement.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterDelayFromFirstElement.java deleted file mode 100644 index da6d74a..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterDelayFromFirstElement.java +++ /dev/null @@ -1,240 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.transforms.windowing; - -import com.google.common.collect.ImmutableList; -import java.util.List; -import java.util.Locale; -import java.util.Objects; -import org.apache.beam.sdk.annotations.Experimental; -import org.apache.beam.sdk.transforms.SerializableFunction; -import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger; -import org.apache.beam.sdk.util.TimeDomain; -import org.joda.time.Duration; -import org.joda.time.Instant; -import org.joda.time.format.PeriodFormat; -import org.joda.time.format.PeriodFormatter; - -/** - * A base class for triggers that happen after a processing time delay from the arrival - * of the first element in a pane. - * - * <p>This class is for internal use only and may change at any time. - */ -@Experimental(Experimental.Kind.TRIGGER) -public abstract class AfterDelayFromFirstElement extends OnceTrigger { - - protected static final List<SerializableFunction<Instant, Instant>> IDENTITY = - ImmutableList.<SerializableFunction<Instant, Instant>>of(); - - private static final PeriodFormatter PERIOD_FORMATTER = PeriodFormat.wordBased(Locale.ENGLISH); - - /** - * To complete an implementation, return a new instance like this one, but incorporating - * the provided timestamp mapping functions. Generally should be used by calling the - * constructor of this class from the constructor of the subclass. - */ - protected abstract AfterDelayFromFirstElement newWith( - List<SerializableFunction<Instant, Instant>> transform); - - /** - * A list of timestampMappers m1, m2, m3, ... m_n considered to be composed in sequence. The - * overall mapping for an instance `instance` is `m_n(... m3(m2(m1(instant))`, - * implemented via #computeTargetTimestamp - */ - protected final List<SerializableFunction<Instant, Instant>> timestampMappers; - - private final TimeDomain timeDomain; - - public AfterDelayFromFirstElement( - TimeDomain timeDomain, - List<SerializableFunction<Instant, Instant>> timestampMappers) { - super(null); - this.timestampMappers = timestampMappers; - this.timeDomain = timeDomain; - } - - /** - * The time domain according for which this trigger sets timers. - */ - public TimeDomain getTimeDomain() { - return timeDomain; - } - - /** - * The mapping functions applied to the arrival time of an element to determine when to - * set a wake-up timer for triggering. - */ - public List<SerializableFunction<Instant, Instant>> getTimestampMappers() { - return timestampMappers; - } - - /** - * Aligns timestamps to the smallest multiple of {@code size} since the {@code offset} greater - * than the timestamp. - * - * <p>TODO: Consider sharing this with FixedWindows, and bring over the equivalent of - * CalendarWindows. - */ - public AfterDelayFromFirstElement alignedTo(final Duration size, final Instant offset) { - return newWith(new AlignFn(size, offset)); - } - - /** - * Aligns the time to be the smallest multiple of {@code size} greater than the timestamp - * since the epoch. - */ - public AfterDelayFromFirstElement alignedTo(final Duration size) { - return alignedTo(size, new Instant(0)); - } - - /** - * Adds some delay to the original target time. - * - * @param delay the delay to add - * @return An updated time trigger that will wait the additional time before firing. - */ - public AfterDelayFromFirstElement plusDelayOf(final Duration delay) { - return newWith(new DelayFn(delay)); - } - - /** - * @deprecated This will be removed in the next major version. Please use only - * {@link #plusDelayOf} and {@link #alignedTo}. - */ - @Deprecated - public OnceTrigger mappedTo(SerializableFunction<Instant, Instant> timestampMapper) { - return newWith(timestampMapper); - } - - @Override - public boolean isCompatible(Trigger other) { - if (!getClass().equals(other.getClass())) { - return false; - } - - AfterDelayFromFirstElement that = (AfterDelayFromFirstElement) other; - return this.timestampMappers.equals(that.timestampMappers); - } - - - private AfterDelayFromFirstElement newWith( - SerializableFunction<Instant, Instant> timestampMapper) { - return newWith( - ImmutableList.<SerializableFunction<Instant, Instant>>builder() - .addAll(timestampMappers) - .add(timestampMapper) - .build()); - } - - @Override - public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) { - return BoundedWindow.TIMESTAMP_MAX_VALUE; - } - - protected Instant computeTargetTimestamp(Instant time) { - Instant result = time; - for (SerializableFunction<Instant, Instant> timestampMapper : timestampMappers) { - result = timestampMapper.apply(result); - } - return result; - } - - /** - * A {@link SerializableFunction} to delay the timestamp at which this triggers fires. - */ - private static final class DelayFn implements SerializableFunction<Instant, Instant> { - private final Duration delay; - - public DelayFn(Duration delay) { - this.delay = delay; - } - - @Override - public Instant apply(Instant input) { - return input.plus(delay); - } - - @Override - public boolean equals(Object object) { - if (object == this) { - return true; - } - - if (!(object instanceof DelayFn)) { - return false; - } - - return this.delay.equals(((DelayFn) object).delay); - } - - @Override - public int hashCode() { - return Objects.hash(delay); - } - - @Override - public String toString() { - return PERIOD_FORMATTER.print(delay.toPeriod()); - } - } - - /** - * A {@link SerializableFunction} to align an instant to the nearest interval boundary. - */ - static final class AlignFn implements SerializableFunction<Instant, Instant> { - private final Duration size; - private final Instant offset; - - - /** - * Aligns timestamps to the smallest multiple of {@code size} since the {@code offset} greater - * than the timestamp. - */ - public AlignFn(Duration size, Instant offset) { - this.size = size; - this.offset = offset; - } - - @Override - public Instant apply(Instant point) { - long millisSinceStart = new Duration(offset, point).getMillis() % size.getMillis(); - return millisSinceStart == 0 ? point : point.plus(size).minus(millisSinceStart); - } - - @Override - public boolean equals(Object object) { - if (object == this) { - return true; - } - - if (!(object instanceof AlignFn)) { - return false; - } - - AlignFn other = (AlignFn) object; - return other.size.equals(this.size) - && other.offset.equals(this.offset); - } - - @Override - public int hashCode() { - return Objects.hash(size, offset); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/40c4a5cb/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTime.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTime.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTime.java index 09f288e..eda269a 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTime.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTime.java @@ -17,40 +17,87 @@ */ package org.apache.beam.sdk.transforms.windowing; +import com.google.common.collect.ImmutableList; +import java.util.Collections; import java.util.List; +import java.util.Locale; import java.util.Objects; import org.apache.beam.sdk.annotations.Experimental; -import org.apache.beam.sdk.transforms.SerializableFunction; -import org.apache.beam.sdk.util.TimeDomain; +import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger; +import org.joda.time.Duration; import org.joda.time.Instant; +import org.joda.time.format.PeriodFormat; +import org.joda.time.format.PeriodFormatter; /** * {@code AfterProcessingTime} triggers fire based on the current processing time. They operate in * the real-time domain. - * - * <p>The time at which to fire the timer can be adjusted via the methods in - * {@link AfterDelayFromFirstElement}, such as {@link AfterDelayFromFirstElement#plusDelayOf} or - * {@link AfterDelayFromFirstElement#alignedTo}. */ @Experimental(Experimental.Kind.TRIGGER) -public class AfterProcessingTime extends AfterDelayFromFirstElement { +public class AfterProcessingTime extends OnceTrigger { + + private static final PeriodFormatter DURATION_FORMATTER = PeriodFormat.wordBased(Locale.ENGLISH); + + private final List<TimestampTransform> timestampTransforms; - private AfterProcessingTime(List<SerializableFunction<Instant, Instant>> transforms) { - super(TimeDomain.PROCESSING_TIME, transforms); + public AfterProcessingTime(List<TimestampTransform> timestampTransforms) { + super(null); + this.timestampTransforms = timestampTransforms; } /** - * Creates a trigger that fires when the current processing time passes the processing time - * at which this trigger saw the first element in a pane. + * Creates a trigger that fires when the current processing time passes the processing time at + * which this trigger saw the first element in a pane. */ public static AfterProcessingTime pastFirstElementInPane() { - return new AfterProcessingTime(IDENTITY); + return new AfterProcessingTime(Collections.<TimestampTransform>emptyList()); + } + + /** + * The transforms applied to the arrival time of an element to determine when this trigger allows + * output. + */ + public List<TimestampTransform> getTimestampTransforms() { + return timestampTransforms; + } + + /** + * Adds some delay to the original target time. + * + * @param delay the delay to add + * @return An updated time trigger that will wait the additional time before firing. + */ + public AfterProcessingTime plusDelayOf(final Duration delay) { + return new AfterProcessingTime( + ImmutableList.<TimestampTransform>builder() + .addAll(timestampTransforms) + .add(TimestampTransform.delay(delay)) + .build()); + } + + /** + * Aligns timestamps to the smallest multiple of {@code period} since the {@code offset} greater + * than the timestamp. + */ + public AfterProcessingTime alignedTo(final Duration period, final Instant offset) { + return new AfterProcessingTime( + ImmutableList.<TimestampTransform>builder() + .addAll(timestampTransforms) + .add(TimestampTransform.alignTo(period, offset)) + .build()); + } + + /** + * Aligns the time to be the smallest multiple of {@code period} greater than the epoch + * boundary (aka {@code new Instant(0)}). + */ + public AfterProcessingTime alignedTo(final Duration period) { + return alignedTo(period, new Instant(0)); } @Override - protected AfterProcessingTime newWith( - List<SerializableFunction<Instant, Instant>> transforms) { - return new AfterProcessingTime(transforms); + public boolean isCompatible(Trigger other) { + return this.equals(other); } @Override @@ -60,17 +107,28 @@ public class AfterProcessingTime extends AfterDelayFromFirstElement { @Override protected Trigger getContinuationTrigger(List<Trigger> continuationTriggers) { - return new AfterSynchronizedProcessingTime(); + return AfterSynchronizedProcessingTime.ofFirstElement(); } @Override public String toString() { StringBuilder builder = new StringBuilder("AfterProcessingTime.pastFirstElementInPane()"); - for (SerializableFunction<Instant, Instant> delayFn : timestampMappers) { - builder - .append(".plusDelayOf(") - .append(delayFn) - .append(")"); + for (TimestampTransform transform : getTimestampTransforms()) { + if (transform instanceof TimestampTransform.Delay) { + TimestampTransform.Delay delay = (TimestampTransform.Delay) transform; + builder + .append(".plusDelayOf(") + .append(DURATION_FORMATTER.print(delay.getDelay().toPeriod())) + .append(")"); + } else if (transform instanceof TimestampTransform.AlignTo) { + TimestampTransform.AlignTo alignTo = (TimestampTransform.AlignTo) transform; + builder + .append(".alignedTo(") + .append(DURATION_FORMATTER.print(alignTo.getPeriod().toPeriod())) + .append(", ") + .append(alignTo.getOffset()) + .append(")"); + } } return builder.toString(); @@ -84,12 +142,13 @@ public class AfterProcessingTime extends AfterDelayFromFirstElement { if (!(obj instanceof AfterProcessingTime)) { return false; } + AfterProcessingTime that = (AfterProcessingTime) obj; - return Objects.equals(this.timestampMappers, that.timestampMappers); + return getTimestampTransforms().equals(that.getTimestampTransforms()); } @Override public int hashCode() { - return Objects.hash(getClass(), this.timestampMappers); + return Objects.hash(getTimestampTransforms()); } } http://git-wip-us.apache.org/repos/asf/beam/blob/40c4a5cb/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTime.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTime.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTime.java index b6258f8..6249954 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTime.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTime.java @@ -18,21 +18,22 @@ package org.apache.beam.sdk.transforms.windowing; import com.google.common.base.Objects; -import java.util.Collections; import java.util.List; -import org.apache.beam.sdk.transforms.SerializableFunction; -import org.apache.beam.sdk.util.TimeDomain; +import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger; import org.joda.time.Instant; /** - * A trigger that fires after synchronized processing time has reached a shared - * threshold between upstream workers. + * A trigger that fires after synchronized processing time has reached the processing time of the + * first element's arrival. + * + * <p>This is for internal, primarily as a "continuation trigger" for {@link AfterProcessingTime} + * triggers. In that use, this trigger is ready as soon as all upstream workers processing time + * clocks have caught up to the moment that input arrived. */ -public class AfterSynchronizedProcessingTime extends AfterDelayFromFirstElement { +public class AfterSynchronizedProcessingTime extends OnceTrigger { public AfterSynchronizedProcessingTime() { - super(TimeDomain.SYNCHRONIZED_PROCESSING_TIME, - Collections.<SerializableFunction<Instant, Instant>>emptyList()); + super(null); } @Override @@ -59,12 +60,4 @@ public class AfterSynchronizedProcessingTime extends AfterDelayFromFirstElement public int hashCode() { return Objects.hashCode(AfterSynchronizedProcessingTime.class); } - - @Override - protected AfterSynchronizedProcessingTime - newWith(List<SerializableFunction<Instant, Instant>> transforms) { - // ignore transforms - return this; - } - } http://git-wip-us.apache.org/repos/asf/beam/blob/40c4a5cb/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/TimestampTransform.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/TimestampTransform.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/TimestampTransform.java new file mode 100644 index 0000000..b16e968 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/TimestampTransform.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms.windowing; + +import com.google.auto.value.AutoValue; +import org.joda.time.Duration; +import org.joda.time.Instant; + +/** An abstract description of a standardized transformation on timestamps. */ +public abstract class TimestampTransform { + + /** Returns a transform that shifts a timestamp later by {@code delay}. */ + public static TimestampTransform delay(Duration delay) { + return new AutoValue_TimestampTransform_Delay(delay); + } + + /** + * Returns a transform that aligns a timestamp to the next boundary of {@code period}, starting + * from {@code offset}. + */ + public static TimestampTransform alignTo(Duration period, Instant offset) { + return new AutoValue_TimestampTransform_AlignTo(period, offset); + } + + /** + * Returns a transform that aligns a timestamp to the next boundary of {@code period}, starting + * from the start of the epoch. + */ + public static TimestampTransform alignTo(Duration period) { + return alignTo(period, new Instant(0)); + } + + /** + * Represents the transform that aligns a timestamp to the next boundary of {@link #getPeriod()} + * start at {@link #getOffset()}. + */ + @AutoValue + public abstract static class AlignTo extends TimestampTransform { + public abstract Duration getPeriod(); + + public abstract Instant getOffset(); + } + + /** Represents the transform that delays a timestamp by {@link #getDelay()}. */ + @AutoValue + public abstract static class Delay extends TimestampTransform { + public abstract Duration getDelay(); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/40c4a5cb/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTimeTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTimeTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTimeTest.java index 4984d7c..006ff00 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTimeTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTimeTest.java @@ -45,7 +45,7 @@ public class AfterProcessingTimeTest { OnceTrigger firstElementPlus1 = AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardHours(1)); assertEquals( - new AfterSynchronizedProcessingTime(), + AfterSynchronizedProcessingTime.ofFirstElement(), firstElementPlus1.getContinuationTrigger()); }