Add Timer, TimerId, TimerSpec and TimerSpecs

These are the simple specifications that describe
a timer without instantiating it. Consists only of
the time domain.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1b780c28
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1b780c28
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1b780c28

Branch: refs/heads/master
Commit: 1b780c285f4be2f9780f3fdf39baa52bfbb387e2
Parents: 08fdb30
Author: Kenneth Knowles <k...@google.com>
Authored: Thu Oct 13 13:41:38 2016 -0700
Committer: Kenneth Knowles <k...@google.com>
Committed: Wed Oct 19 17:52:21 2016 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/transforms/DoFn.java    | 80 +++++++++++++++++++-
 .../java/org/apache/beam/sdk/util/Timer.java    | 56 ++++++++++++++
 .../org/apache/beam/sdk/util/TimerSpec.java     | 30 ++++++++
 .../org/apache/beam/sdk/util/TimerSpecs.java    | 41 ++++++++++
 4 files changed, 204 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1b780c28/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
index c86693b..8b3aaf8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
@@ -43,6 +43,8 @@ import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.Timer;
+import org.apache.beam.sdk.util.TimerSpec;
 import org.apache.beam.sdk.util.WindowingInternals;
 import org.apache.beam.sdk.util.state.State;
 import org.apache.beam.sdk.util.state.StateSpec;
@@ -399,12 +401,13 @@ public abstract class DoFn<InputT, OutputT> implements 
Serializable, HasDisplayD
   /**
    * Annotation for declaring and dereferencing state cells.
    *
-   * <p><i>Not currently supported by any runner</i>.
+   * <p><i>Not currently supported by any runner. When ready, the feature will 
work as described
+   * here.</i>
    *
    * <p>To declare a state cell, create a field of type {@link StateSpec} 
annotated with a {@link
    * StateId}. To use the cell during processing, add a parameter of the 
appropriate {@link State}
-   * subclass to your {@link ProcessElement @ProcessElement} method, and 
annotate it with {@link
-   * StateId}. See the following code for an example:
+   * subclass to your {@link ProcessElement @ProcessElement} or {@link OnTimer 
@OnTimer} method, and
+   * annotate it with {@link StateId}. See the following code for an example:
    *
    * <pre>{@code
    * new DoFn<KV<Key, Foo>, Baz>() {
@@ -440,6 +443,77 @@ public abstract class DoFn<InputT, OutputT> implements 
Serializable, HasDisplayD
   }
 
   /**
+   * Annotation for declaring and dereferencing timers.
+   *
+   * <p><i>Not currently supported by any runner. When ready, the feature will 
work as described
+   * here.</i>
+   *
+   * <p>To declare a timer, create a field of type {@link TimerSpec} annotated 
with a {@link
+   * TimerId}. To use the cell during processing, add a parameter of the type 
{@link Timer} to your
+   * {@link ProcessElement @ProcessElement} or {@link OnTimer @OnTimer} 
method, and annotate it with
+   * {@link TimerId}. See the following code for an example:
+   *
+   * <pre>{@code
+   * new DoFn<KV<Key, Foo>, Baz>() {
+   *   @TimerId("my-timer-id")
+   *   private final TimerSpec myTimer = 
TimerSpecs.timerForDomain(TimeDomain.EVENT_TIME);
+   *
+   *   @ProcessElement
+   *   public void processElement(
+   *       ProcessContext c,
+   *       @TimerId("my-timer-id") Timer myTimer) {
+   *     myTimer.setForNowPlus(Duration.standardSeconds(...));
+   *   }
+   *
+   *   @OnTimer("my-timer-id")
+   *   public void onMyTimer() {
+   *     ...
+   *   }
+   * }
+   * }</pre>
+   *
+   * <p>Timers are subject to the following validity conditions:
+   *
+   * <ul>
+   * <li>Each timer must have a distinct id.
+   * <li>Any timer referenced in a parameter must be declared.
+   * <li>Timer declarations must be final.
+   * <li>All declared timers must have a corresponding callback annotated with 
{@link
+   *     OnTimer @OnTimer}.
+   * </ul>
+   */
+  @Documented
+  @Retention(RetentionPolicy.RUNTIME)
+  @Target({ElementType.FIELD, ElementType.PARAMETER})
+  @Experimental(Kind.TIMERS)
+  public @interface TimerId {
+    /** The timer ID. */
+    String value();
+  }
+
+  /**
+   * Annotation for registering a callback for a timer.
+   *
+   * <p><i>Not currently supported by any runner. When ready, the feature will 
work as described
+   * here.</i>
+   *
+   * <p>See the javadoc for {@link TimerId} for use in a full example.
+   *
+   * <p>The method annotated with {@code @OnTimer} may have parameters 
according to the same logic
+   * as {@link ProcessElement}, but limited to the {@link BoundedWindow}, 
{@link State} subclasses,
+   * and {@link Timer}. State and timer parameters must be annotated with 
their {@link StateId} and
+   * {@link TimerId} respectively.
+   */
+  @Documented
+  @Retention(RetentionPolicy.RUNTIME)
+  @Target(ElementType.METHOD)
+  @Experimental(Kind.TIMERS)
+  public @interface OnTimer {
+    /** The timer ID. */
+    String value();
+  }
+
+  /**
    * Annotation for the method to use to prepare an instance for processing 
bundles of elements. The
    * method annotated with this must satisfy the following constraints
    * <ul>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1b780c28/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Timer.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Timer.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Timer.java
new file mode 100644
index 0000000..556287d
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Timer.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.joda.time.Duration;
+
+/**
+ * A timer for a specified time domain that can be set to register the desire 
for further processing
+ * at particular time in its specified time domain.
+ *
+ * <p>See {@link TimeDomain} for details on the time domains available.
+ *
+ * <p>In a {@link DoFn}, a {@link Timer} is specified by a {@link TimerSpec} 
annotated with {@link
+ * DoFn.TimerId}.
+ *
+ * <p>An implementation of {@link Timer} is implicitly scoped - it may be 
scoped to a key and
+ * window, or a key, window, and trigger, etc.
+ *
+ * <p>A timer exists in one of two states: set or unset. A timer can be set 
only for a single time
+ * per scope.
+ *
+ * <p>Timer callbacks are not guaranteed to be called immediately according to 
the local view of the
+ * {@link TimeDomain}, but will be called at some time after the requested 
time, in timestamp
+ * order.
+ */
+@Experimental(Experimental.Kind.TIMERS)
+public interface Timer {
+  /**
+   * Sets or resets the time relative to the current time in the timer's 
{@link TimeDomain} at which
+   * this it should fire. If the timer was already set, resets it to the new 
requested time.
+   */
+  void setForNowPlus(Duration durationFromNow);
+
+  /**
+   * Unsets this timer. It is permitted to {@code cancel()} whether or not the 
timer was actually
+   * set.
+   */
+  void cancel();
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1b780c28/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerSpec.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerSpec.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerSpec.java
new file mode 100644
index 0000000..5b7717b
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerSpec.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import java.io.Serializable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+
+/**
+ * A specification for a {@link Timer}. This includes its {@link TimeDomain}.
+ */
+@Experimental(Kind.TIMERS)
+public interface TimerSpec extends Serializable {
+  TimeDomain getTimeDomain();
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1b780c28/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerSpecs.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerSpecs.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerSpecs.java
new file mode 100644
index 0000000..f289ccd
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerSpecs.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import com.google.auto.value.AutoValue;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+
+/**
+ * Static methods for working with {@link TimerSpec}.
+ */
+@Experimental(Kind.TIMERS)
+public class TimerSpecs {
+
+  public static TimerSpec timer(TimeDomain timeDomain) {
+    return new AutoValue_TimerSpecs_SimpleTimerSpec(timeDomain);
+  }
+
+  /**
+   * A straightforward POJO {@link TimerSpec}. Package-level access for 
AutoValue.
+   */
+  @AutoValue
+  abstract static class SimpleTimerSpec implements TimerSpec {
+    public abstract TimeDomain getTimeDomain();
+  }
+}

Reply via email to