Add Timer, TimerId, TimerSpec and TimerSpecs These are the simple specifications that describe a timer without instantiating it. Consists only of the time domain.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1b780c28 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1b780c28 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1b780c28 Branch: refs/heads/master Commit: 1b780c285f4be2f9780f3fdf39baa52bfbb387e2 Parents: 08fdb30 Author: Kenneth Knowles <k...@google.com> Authored: Thu Oct 13 13:41:38 2016 -0700 Committer: Kenneth Knowles <k...@google.com> Committed: Wed Oct 19 17:52:21 2016 -0700 ---------------------------------------------------------------------- .../org/apache/beam/sdk/transforms/DoFn.java | 80 +++++++++++++++++++- .../java/org/apache/beam/sdk/util/Timer.java | 56 ++++++++++++++ .../org/apache/beam/sdk/util/TimerSpec.java | 30 ++++++++ .../org/apache/beam/sdk/util/TimerSpecs.java | 41 ++++++++++ 4 files changed, 204 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1b780c28/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java index c86693b..8b3aaf8 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java @@ -43,6 +43,8 @@ import org.apache.beam.sdk.transforms.reflect.DoFnInvoker; import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.Timer; +import org.apache.beam.sdk.util.TimerSpec; import org.apache.beam.sdk.util.WindowingInternals; import org.apache.beam.sdk.util.state.State; import org.apache.beam.sdk.util.state.StateSpec; @@ -399,12 +401,13 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD /** * Annotation for declaring and dereferencing state cells. * - * <p><i>Not currently supported by any runner</i>. + * <p><i>Not currently supported by any runner. When ready, the feature will work as described + * here.</i> * * <p>To declare a state cell, create a field of type {@link StateSpec} annotated with a {@link * StateId}. To use the cell during processing, add a parameter of the appropriate {@link State} - * subclass to your {@link ProcessElement @ProcessElement} method, and annotate it with {@link - * StateId}. See the following code for an example: + * subclass to your {@link ProcessElement @ProcessElement} or {@link OnTimer @OnTimer} method, and + * annotate it with {@link StateId}. See the following code for an example: * * <pre>{@code * new DoFn<KV<Key, Foo>, Baz>() { @@ -440,6 +443,77 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD } /** + * Annotation for declaring and dereferencing timers. + * + * <p><i>Not currently supported by any runner. When ready, the feature will work as described + * here.</i> + * + * <p>To declare a timer, create a field of type {@link TimerSpec} annotated with a {@link + * TimerId}. To use the cell during processing, add a parameter of the type {@link Timer} to your + * {@link ProcessElement @ProcessElement} or {@link OnTimer @OnTimer} method, and annotate it with + * {@link TimerId}. See the following code for an example: + * + * <pre>{@code + * new DoFn<KV<Key, Foo>, Baz>() { + * @TimerId("my-timer-id") + * private final TimerSpec myTimer = TimerSpecs.timerForDomain(TimeDomain.EVENT_TIME); + * + * @ProcessElement + * public void processElement( + * ProcessContext c, + * @TimerId("my-timer-id") Timer myTimer) { + * myTimer.setForNowPlus(Duration.standardSeconds(...)); + * } + * + * @OnTimer("my-timer-id") + * public void onMyTimer() { + * ... + * } + * } + * }</pre> + * + * <p>Timers are subject to the following validity conditions: + * + * <ul> + * <li>Each timer must have a distinct id. + * <li>Any timer referenced in a parameter must be declared. + * <li>Timer declarations must be final. + * <li>All declared timers must have a corresponding callback annotated with {@link + * OnTimer @OnTimer}. + * </ul> + */ + @Documented + @Retention(RetentionPolicy.RUNTIME) + @Target({ElementType.FIELD, ElementType.PARAMETER}) + @Experimental(Kind.TIMERS) + public @interface TimerId { + /** The timer ID. */ + String value(); + } + + /** + * Annotation for registering a callback for a timer. + * + * <p><i>Not currently supported by any runner. When ready, the feature will work as described + * here.</i> + * + * <p>See the javadoc for {@link TimerId} for use in a full example. + * + * <p>The method annotated with {@code @OnTimer} may have parameters according to the same logic + * as {@link ProcessElement}, but limited to the {@link BoundedWindow}, {@link State} subclasses, + * and {@link Timer}. State and timer parameters must be annotated with their {@link StateId} and + * {@link TimerId} respectively. + */ + @Documented + @Retention(RetentionPolicy.RUNTIME) + @Target(ElementType.METHOD) + @Experimental(Kind.TIMERS) + public @interface OnTimer { + /** The timer ID. */ + String value(); + } + + /** * Annotation for the method to use to prepare an instance for processing bundles of elements. The * method annotated with this must satisfy the following constraints * <ul> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1b780c28/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Timer.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Timer.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Timer.java new file mode 100644 index 0000000..556287d --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Timer.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.transforms.DoFn; +import org.joda.time.Duration; + +/** + * A timer for a specified time domain that can be set to register the desire for further processing + * at particular time in its specified time domain. + * + * <p>See {@link TimeDomain} for details on the time domains available. + * + * <p>In a {@link DoFn}, a {@link Timer} is specified by a {@link TimerSpec} annotated with {@link + * DoFn.TimerId}. + * + * <p>An implementation of {@link Timer} is implicitly scoped - it may be scoped to a key and + * window, or a key, window, and trigger, etc. + * + * <p>A timer exists in one of two states: set or unset. A timer can be set only for a single time + * per scope. + * + * <p>Timer callbacks are not guaranteed to be called immediately according to the local view of the + * {@link TimeDomain}, but will be called at some time after the requested time, in timestamp + * order. + */ +@Experimental(Experimental.Kind.TIMERS) +public interface Timer { + /** + * Sets or resets the time relative to the current time in the timer's {@link TimeDomain} at which + * this it should fire. If the timer was already set, resets it to the new requested time. + */ + void setForNowPlus(Duration durationFromNow); + + /** + * Unsets this timer. It is permitted to {@code cancel()} whether or not the timer was actually + * set. + */ + void cancel(); +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1b780c28/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerSpec.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerSpec.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerSpec.java new file mode 100644 index 0000000..5b7717b --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerSpec.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import java.io.Serializable; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; + +/** + * A specification for a {@link Timer}. This includes its {@link TimeDomain}. + */ +@Experimental(Kind.TIMERS) +public interface TimerSpec extends Serializable { + TimeDomain getTimeDomain(); +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1b780c28/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerSpecs.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerSpecs.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerSpecs.java new file mode 100644 index 0000000..f289ccd --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerSpecs.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import com.google.auto.value.AutoValue; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; + +/** + * Static methods for working with {@link TimerSpec}. + */ +@Experimental(Kind.TIMERS) +public class TimerSpecs { + + public static TimerSpec timer(TimeDomain timeDomain) { + return new AutoValue_TimerSpecs_SimpleTimerSpec(timeDomain); + } + + /** + * A straightforward POJO {@link TimerSpec}. Package-level access for AutoValue. + */ + @AutoValue + abstract static class SimpleTimerSpec implements TimerSpec { + public abstract TimeDomain getTimeDomain(); + } +}