Repository: samza
Updated Branches:
  refs/heads/master 78ee98261 -> aa0586558


SAMZA-1498: Support arbitrary system clock timer in operators

This patch adds the capability to register arbitrary timers for both high-level 
and low-level api.
For high-level, InitableFunction will pass the TimerRegistry to user through 
the new OpContext, and user will implement the TimerFunction to get timer 
notifications.
For low-level api, user can register timer in the TaskContext, and then 
implements the TimerCallback for specific timer actions.

Author: xiliu <xi...@linkedin.com>
Author: xinyuiscool <xinyuliu...@gmail.com>
Author: xinyuiscool <xi...@linkedin.com>

Reviewers: Pateek M <prate...@gmail.com>

Closes #419 from xinyuiscool/SAMZA-1498


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/aa058655
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/aa058655
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/aa058655

Branch: refs/heads/master
Commit: aa058655890735274f679f009db2a64f87ec3c04
Parents: 78ee982
Author: xiliu <xi...@linkedin.com>
Authored: Tue Mar 6 18:20:15 2018 -0800
Committer: xiliu <xi...@linkedin.com>
Committed: Tue Mar 6 18:20:15 2018 -0800

----------------------------------------------------------------------
 .../apache/samza/operators/TimerRegistry.java   |  41 +++++
 .../operators/functions/InitableFunction.java   |   3 +-
 .../operators/functions/TimerFunction.java      |  65 +++++++
 .../java/org/apache/samza/task/TaskContext.java |  19 ++
 .../org/apache/samza/task/TimerCallback.java    |  34 ++++
 .../apache/samza/container/TaskContextImpl.java |  31 +++-
 .../samza/operators/impl/OperatorImpl.java      |  42 ++++-
 .../samza/operators/impl/OperatorImplGraph.java |  31 ++--
 .../operators/spec/BroadcastOperatorSpec.java   |   6 +
 .../samza/operators/spec/InputOperatorSpec.java |   8 +-
 .../samza/operators/spec/JoinOperatorSpec.java  |   6 +
 .../samza/operators/spec/OperatorSpec.java      |   3 +
 .../samza/operators/spec/OperatorSpecs.java     |   8 +-
 .../operators/spec/OutputOperatorSpec.java      |   6 +
 .../operators/spec/PartitionByOperatorSpec.java |   6 +
 .../operators/spec/SendToTableOperatorSpec.java |   6 +
 .../samza/operators/spec/SinkOperatorSpec.java  |   6 +
 .../operators/spec/StreamOperatorSpec.java      |  13 +-
 .../spec/StreamTableJoinOperatorSpec.java       |   6 +
 .../operators/spec/WindowOperatorSpec.java      |   7 +
 .../org/apache/samza/task/AsyncRunLoop.java     |  79 +++++++--
 .../apache/samza/task/SystemTimerScheduler.java | 154 ++++++++++++++++
 .../apache/samza/container/SamzaContainer.scala |  25 ++-
 .../samza/container/SamzaContainerMetrics.scala |   2 +
 .../apache/samza/container/TaskInstance.scala   |  19 +-
 .../samza/operators/impl/TestOperatorImpl.java  |   6 +
 .../operators/impl/TestOperatorImplGraph.java   |   4 +-
 .../org/apache/samza/task/TestAsyncRunLoop.java |   3 +-
 .../samza/task/TestSystemTimerScheduler.java    | 176 +++++++++++++++++++
 .../apache/samza/test/timer/TestTimerApp.java   |  86 +++++++++
 .../org/apache/samza/test/timer/TimerTest.java  |  51 ++++++
 31 files changed, 904 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/aa058655/samza-api/src/main/java/org/apache/samza/operators/TimerRegistry.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/operators/TimerRegistry.java 
b/samza-api/src/main/java/org/apache/samza/operators/TimerRegistry.java
new file mode 100644
index 0000000..64dd4ec
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/operators/TimerRegistry.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.operators;
+
+/**
+ * Allows registering epoch-time timer callbacks from the operators.
+ * See {@link org.apache.samza.operators.functions.TimerFunction} for details.
+ * @param <K> type of the timer key
+ */
+public interface TimerRegistry<K> {
+
+  /**
+   * Register a epoch-time timer with key.
+   * @param key unique timer key
+   * @param timestamp epoch time when the timer will be fired, in milliseconds
+   */
+  void register(K key, long timestamp);
+
+  /**
+   * Delete the timer for the provided key.
+   * @param key key for the timer to delete
+   */
+  void delete(K key);
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/aa058655/samza-api/src/main/java/org/apache/samza/operators/functions/InitableFunction.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/operators/functions/InitableFunction.java
 
b/samza-api/src/main/java/org/apache/samza/operators/functions/InitableFunction.java
index b08c6cd..6651819 100644
--- 
a/samza-api/src/main/java/org/apache/samza/operators/functions/InitableFunction.java
+++ 
b/samza-api/src/main/java/org/apache/samza/operators/functions/InitableFunction.java
@@ -16,13 +16,13 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.samza.operators.functions;
 
 import org.apache.samza.annotation.InterfaceStability;
 import org.apache.samza.config.Config;
 import org.apache.samza.task.TaskContext;
 
-
 /**
  * A function that can be initialized before execution.
  *
@@ -41,5 +41,4 @@ public interface InitableFunction {
    * @param context the {@link TaskContext} for this task
    */
   default void init(Config config, TaskContext context) { }
-
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/aa058655/samza-api/src/main/java/org/apache/samza/operators/functions/TimerFunction.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/operators/functions/TimerFunction.java
 
b/samza-api/src/main/java/org/apache/samza/operators/functions/TimerFunction.java
new file mode 100644
index 0000000..01825c6
--- /dev/null
+++ 
b/samza-api/src/main/java/org/apache/samza/operators/functions/TimerFunction.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.operators.functions;
+
+import org.apache.samza.operators.TimerRegistry;
+
+import java.util.Collection;
+
+/**
+ * Allows timer registration with a key and is invoked when the timer is fired.
+ * Key must be a unique identifier for this timer, and is provided in the 
callback when the timer fires.
+ *
+ * <p>
+ * Example of a {@link FlatMapFunction} with timer:
+ * <pre>{@code
+ *    public class ExampleTimerFn implements FlatMapFunction<String, String>, 
TimerFunction<String, String> {
+ *      public void registerTimer(TimerRegistry timerRegistry) {
+ *        long time = System.currentTimeMillis() + 5000; // fire after 5 sec
+ *        timerRegistry.register("example-timer", time);
+ *      }
+ *      public Collection<String> apply(String s) {
+ *        ...
+ *      }
+ *      public Collection<String> onTimer(String key, long timestamp) {
+ *        // example-timer fired
+ *        ...
+ *      }
+ *    }
+ * }</pre>
+ * @param <K> type of the key
+ * @param <OM> type of the output
+ */
+public interface TimerFunction<K, OM> {
+
+  /**
+   * Registers any epoch-time timers using the registry
+   * @param timerRegistry a keyed {@link TimerRegistry}
+   */
+  void registerTimer(TimerRegistry<K> timerRegistry);
+
+  /**
+   * Returns the output after the timer with key fires.
+   * @param key timer key
+   * @param timestamp time of the epoch-time timer fired, in milliseconds
+   * @return {@link Collection} of output elements
+   */
+  Collection<OM> onTimer(K key, long timestamp);
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/aa058655/samza-api/src/main/java/org/apache/samza/task/TaskContext.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/task/TaskContext.java 
b/samza-api/src/main/java/org/apache/samza/task/TaskContext.java
index 11ffacc..ea2a3bc 100644
--- a/samza-api/src/main/java/org/apache/samza/task/TaskContext.java
+++ b/samza-api/src/main/java/org/apache/samza/task/TaskContext.java
@@ -74,4 +74,23 @@ public interface TaskContext {
   default Object getUserContext() {
     return null;
   }
+
+  /**
+   * Register a keyed timer with a callback of {@link TimerCallback} in this 
task.
+   * The callback will be invoked exclusively with any other operations for 
this task,
+   * e.g. processing, windowing and commit.
+   * @param key timer key
+   * @param timestamp epoch time when the timer will be fired, in milliseconds
+   * @param callback callback when the timer is fired
+   * @param <K> type of the key
+   */
+  <K> void registerTimer(K key, long timestamp, TimerCallback<K> callback);
+
+  /**
+   * Delete the keyed timer in this task.
+   * Deletion only happens if the timer hasn't been fired. Otherwise it will 
not interrupt.
+   * @param key timer key
+   * @param <K> type of the key
+   */
+  <K> void deleteTimer(K key);
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/aa058655/samza-api/src/main/java/org/apache/samza/task/TimerCallback.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/task/TimerCallback.java 
b/samza-api/src/main/java/org/apache/samza/task/TimerCallback.java
new file mode 100644
index 0000000..3add129
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/task/TimerCallback.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.task;
+
+/**
+ * The callback that is invoked when its corresponding timer registered via 
{@link TaskContext} fires.
+ * @param <K> type of the timer key
+ */
+public interface TimerCallback<K> {
+  /**
+   * Invoked when the timer of key fires.
+   * @param key timer key
+   * @param collector contains the means of sending message envelopes to the 
output stream.
+   * @param coordinator manages execution of tasks.
+   */
+  void onTimer(K key, MessageCollector collector, TaskCoordinator coordinator);
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/aa058655/samza-core/src/main/java/org/apache/samza/container/TaskContextImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/container/TaskContextImpl.java 
b/samza-core/src/main/java/org/apache/samza/container/TaskContextImpl.java
index 0248486..0d76a33 100644
--- a/samza-core/src/main/java/org/apache/samza/container/TaskContextImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/container/TaskContextImpl.java
@@ -19,10 +19,7 @@
 
 package org.apache.samza.container;
 
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-
+import com.google.common.collect.ImmutableSet;
 import org.apache.samza.checkpoint.OffsetManager;
 import org.apache.samza.job.model.JobModel;
 import org.apache.samza.metrics.ReadableMetricsRegistry;
@@ -32,11 +29,16 @@ import org.apache.samza.system.StreamMetadataCache;
 import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.table.Table;
 import org.apache.samza.table.TableManager;
+import org.apache.samza.task.SystemTimerScheduler;
 import org.apache.samza.task.TaskContext;
+import org.apache.samza.task.TimerCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.ImmutableSet;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
 
 public class TaskContextImpl implements TaskContext {
   private static final Logger LOG = 
LoggerFactory.getLogger(TaskContextImpl.class);
@@ -51,6 +53,7 @@ public class TaskContextImpl implements TaskContext {
   private final JobModel jobModel;
   private final StreamMetadataCache streamMetadataCache;
   private final Map<String, Object> objectRegistry = new HashMap<>();
+  private final SystemTimerScheduler timerScheduler;
 
   private Object userContext = null;
 
@@ -62,7 +65,8 @@ public class TaskContextImpl implements TaskContext {
                          TaskStorageManager storageManager,
                          TableManager tableManager,
                          JobModel jobModel,
-                         StreamMetadataCache streamMetadataCache) {
+                         StreamMetadataCache streamMetadataCache,
+                         ScheduledExecutorService timerExecutor) {
     this.taskName = taskName;
     this.metrics = metrics;
     this.containerContext = containerContext;
@@ -72,6 +76,7 @@ public class TaskContextImpl implements TaskContext {
     this.tableManager = tableManager;
     this.jobModel = jobModel;
     this.streamMetadataCache = streamMetadataCache;
+    this.timerScheduler = SystemTimerScheduler.create(timerExecutor);
   }
 
   @Override
@@ -129,6 +134,16 @@ public class TaskContextImpl implements TaskContext {
     return userContext;
   }
 
+  @Override
+  public <K> void registerTimer(K key, long timestamp, TimerCallback<K> 
callback) {
+    timerScheduler.setTimer(key, timestamp, callback);
+  }
+
+  @Override
+  public <K> void deleteTimer(K key) {
+    timerScheduler.deleteTimer(key);
+  }
+
   public void registerObject(String name, Object value) {
     objectRegistry.put(name, value);
   }
@@ -144,4 +159,8 @@ public class TaskContextImpl implements TaskContext {
   public StreamMetadataCache getStreamMetadataCache() {
     return streamMetadataCache;
   }
+
+  public SystemTimerScheduler getTimerScheduler() {
+    return timerScheduler;
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/aa058655/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java 
b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
index 15b763d..7219180 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
@@ -25,6 +25,8 @@ import org.apache.samza.container.TaskContextImpl;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.job.model.ContainerModel;
 import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.operators.TimerRegistry;
+import org.apache.samza.operators.functions.TimerFunction;
 import org.apache.samza.operators.functions.WatermarkFunction;
 import org.apache.samza.system.EndOfStreamMessage;
 import org.apache.samza.metrics.Counter;
@@ -80,6 +82,7 @@ public abstract class OperatorImpl<M, RM> {
   private EndOfStreamStates eosStates;
   // watermark states
   private WatermarkStates watermarkStates;
+  private TaskContext taskContext;
 
   /**
    * Initialize this {@link OperatorImpl} and its user-defined functions.
@@ -121,7 +124,8 @@ public abstract class OperatorImpl<M, RM> {
       this.usedInCurrentTask = true;
     }
 
-    handleInit(config, context);
+    this.taskContext = taskContext;
+    handleInit(config, taskContext);
 
     initialized = true;
   }
@@ -415,6 +419,42 @@ public abstract class OperatorImpl<M, RM> {
     }
   }
 
+  /**
+   * Returns a registry which allows registering arbitrary system-clock timer 
with K-typed key.
+   * The user-defined function in the operator spec needs to implement {@link 
TimerFunction#onTimer(Object, long)}
+   * for timer notifications.
+   * @param <K> key type for the timer.
+   * @return an instance of {@link TimerRegistry}
+   */
+  <K> TimerRegistry<K> createOperatorTimerRegistry() {
+    return new TimerRegistry<K>() {
+      @Override
+      public void register(K key, long time) {
+        taskContext.registerTimer(key, time, (k, collector, coordinator) -> {
+            final TimerFunction<K, RM> timerFn = 
getOperatorSpec().getTimerFn();
+            if (timerFn != null) {
+              final Collection<RM> output = timerFn.onTimer(key, time);
+
+              if (!output.isEmpty()) {
+                output.forEach(rm ->
+                    registeredOperators.forEach(op ->
+                        op.onMessage(rm, collector, coordinator)));
+              }
+            } else {
+              throw new SamzaException(
+                  String.format("Operator %s id %s (created at %s) must 
implement TimerFunction to use system timer.",
+                      getOperatorSpec().getOpCode().name(), getOpImplId(), 
getOperatorSpec().getSourceLocation()));
+            }
+          });
+      }
+
+      @Override
+      public void delete(K key) {
+        taskContext.deleteTimer(key);
+      }
+    };
+  }
+
   public void close() {
     if (closed) {
       throw new IllegalStateException(

http://git-wip-us.apache.org/repos/asf/samza/blob/aa058655/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
 
b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
index 3882544..bbc8783 100644
--- 
a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
@@ -18,20 +18,15 @@
  */
 package org.apache.samza.operators.impl;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
 import org.apache.samza.config.Config;
 import org.apache.samza.container.TaskContextImpl;
 import org.apache.samza.job.model.JobModel;
 import org.apache.samza.operators.KV;
 import org.apache.samza.operators.StreamGraphImpl;
+import org.apache.samza.operators.TimerRegistry;
 import org.apache.samza.operators.functions.JoinFunction;
 import org.apache.samza.operators.functions.PartialJoinFunction;
 import org.apache.samza.operators.impl.store.TimestampedValue;
@@ -41,11 +36,11 @@ import org.apache.samza.operators.spec.JoinOperatorSpec;
 import org.apache.samza.operators.spec.OperatorSpec;
 import org.apache.samza.operators.spec.OutputOperatorSpec;
 import org.apache.samza.operators.spec.PartitionByOperatorSpec;
+import org.apache.samza.operators.spec.SendToTableOperatorSpec;
 import org.apache.samza.operators.spec.SinkOperatorSpec;
 import org.apache.samza.operators.spec.StreamOperatorSpec;
 import org.apache.samza.operators.spec.StreamTableJoinOperatorSpec;
 import org.apache.samza.operators.spec.WindowOperatorSpec;
-import org.apache.samza.operators.spec.SendToTableOperatorSpec;
 import org.apache.samza.storage.kv.KeyValueStore;
 import org.apache.samza.system.SystemStream;
 import org.apache.samza.task.TaskContext;
@@ -53,9 +48,14 @@ import org.apache.samza.util.Clock;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Multimap;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
 
 /**
  * The DAG of {@link OperatorImpl}s corresponding to the DAG of {@link 
OperatorSpec}s.
@@ -166,6 +166,11 @@ public class OperatorImplGraph {
       operatorImpl.init(config, context);
       operatorImpl.registerInputStream(inputStream);
 
+      if (operatorSpec.getTimerFn() != null) {
+        final TimerRegistry timerRegistry = 
operatorImpl.createOperatorTimerRegistry();
+        operatorSpec.getTimerFn().registerTimer(timerRegistry);
+      }
+
       // Note: The key here is opImplId, which may not equal opId for some 
impls (e.g. PartialJoinOperatorImpl).
       // This is currently OK since we don't need to look up a partial join 
operator impl again during traversal
       // (a join cannot have a cycle).

http://git-wip-us.apache.org/repos/asf/samza/blob/aa058655/samza-core/src/main/java/org/apache/samza/operators/spec/BroadcastOperatorSpec.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/spec/BroadcastOperatorSpec.java
 
b/samza-core/src/main/java/org/apache/samza/operators/spec/BroadcastOperatorSpec.java
index 6689690..2c76e60 100644
--- 
a/samza-core/src/main/java/org/apache/samza/operators/spec/BroadcastOperatorSpec.java
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/spec/BroadcastOperatorSpec.java
@@ -20,6 +20,7 @@
 
 package org.apache.samza.operators.spec;
 
+import org.apache.samza.operators.functions.TimerFunction;
 import org.apache.samza.operators.functions.WatermarkFunction;
 
 public class BroadcastOperatorSpec<M> extends OperatorSpec<M, Void> {
@@ -40,4 +41,9 @@ public class BroadcastOperatorSpec<M> extends OperatorSpec<M, 
Void> {
   public WatermarkFunction getWatermarkFn() {
     return null;
   }
+
+  @Override
+  public TimerFunction getTimerFn() {
+    return null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/aa058655/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java
 
b/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java
index 2ad0597..2ed1e30 100644
--- 
a/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java
@@ -19,6 +19,7 @@
 package org.apache.samza.operators.spec;
 
 import org.apache.samza.operators.KV;
+import org.apache.samza.operators.functions.TimerFunction;
 import org.apache.samza.serializers.Serde;
 import org.apache.samza.operators.functions.WatermarkFunction;
 import org.apache.samza.system.StreamSpec;
@@ -65,5 +66,10 @@ public class InputOperatorSpec<K, V> extends 
OperatorSpec<KV<K, V>, Object> { //
   @Override
   public WatermarkFunction getWatermarkFn() {
     return null;
-  }  
+  }
+
+  @Override
+  public TimerFunction getTimerFn() {
+    return null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/aa058655/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java
 
b/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java
index c730bca..9e058ff 100644
--- 
a/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java
@@ -20,6 +20,7 @@ package org.apache.samza.operators.spec;
 
 import com.google.common.collect.ImmutableMap;
 import org.apache.samza.operators.functions.JoinFunction;
+import org.apache.samza.operators.functions.TimerFunction;
 import org.apache.samza.operators.functions.WatermarkFunction;
 import org.apache.samza.operators.impl.store.TimestampedValueSerde;
 import org.apache.samza.operators.impl.store.TimestampedValue;
@@ -97,6 +98,11 @@ public class JoinOperatorSpec<K, M, OM, JM> extends 
OperatorSpec<Object, JM> imp
     return joinFn instanceof WatermarkFunction ? (WatermarkFunction) joinFn : 
null;
   }
 
+  @Override
+  public TimerFunction getTimerFn() {
+    return joinFn instanceof TimerFunction ? (TimerFunction) joinFn : null;
+  }
+
   public OperatorSpec getLeftInputOpSpec() {
     return leftInputOpSpec;
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/aa058655/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java 
b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
index 00b5318..7b0a41b 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
@@ -25,6 +25,7 @@ import java.util.Set;
 import org.apache.samza.annotation.InterfaceStability;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.MessageStreamImpl;
+import org.apache.samza.operators.functions.TimerFunction;
 import org.apache.samza.operators.functions.WatermarkFunction;
 
 /**
@@ -130,4 +131,6 @@ public abstract class OperatorSpec<M, OM> {
   }
 
   abstract public WatermarkFunction getWatermarkFn();
+
+  abstract public TimerFunction getTimerFn();
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/aa058655/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java 
b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
index 2a2e33a..c38f6e8 100644
--- 
a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
@@ -95,7 +95,7 @@ public class OperatorSpecs {
       public void close() {
         mapFn.close();
       }
-    }, OperatorSpec.OpCode.MAP, opId);
+    }, mapFn, OperatorSpec.OpCode.MAP, opId);
   }
 
   /**
@@ -129,7 +129,7 @@ public class OperatorSpecs {
       public void close() {
         filterFn.close();
       }
-    }, OperatorSpec.OpCode.FILTER, opId);
+    }, filterFn, OperatorSpec.OpCode.FILTER, opId);
   }
 
   /**
@@ -143,7 +143,7 @@ public class OperatorSpecs {
    */
   public static <M, OM> StreamOperatorSpec<M, OM> createFlatMapOperatorSpec(
       FlatMapFunction<? super M, ? extends OM> flatMapFn, String opId) {
-    return new StreamOperatorSpec<>((FlatMapFunction<M, OM>) flatMapFn, 
OperatorSpec.OpCode.FLAT_MAP, opId);
+    return new StreamOperatorSpec<>((FlatMapFunction<M, OM>) flatMapFn, 
flatMapFn, OperatorSpec.OpCode.FLAT_MAP, opId);
   }
 
   /**
@@ -242,7 +242,7 @@ public class OperatorSpecs {
             this.add(message);
           }
         },
-        OperatorSpec.OpCode.MERGE, opId);
+        null, OperatorSpec.OpCode.MERGE, opId);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/samza/blob/aa058655/samza-core/src/main/java/org/apache/samza/operators/spec/OutputOperatorSpec.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/spec/OutputOperatorSpec.java
 
b/samza-core/src/main/java/org/apache/samza/operators/spec/OutputOperatorSpec.java
index 6cb4fca..40a5c0e 100644
--- 
a/samza-core/src/main/java/org/apache/samza/operators/spec/OutputOperatorSpec.java
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/spec/OutputOperatorSpec.java
@@ -19,6 +19,7 @@
 package org.apache.samza.operators.spec;
 
 
+import org.apache.samza.operators.functions.TimerFunction;
 import org.apache.samza.operators.functions.WatermarkFunction;
 
 /**
@@ -56,4 +57,9 @@ public class OutputOperatorSpec<M> extends OperatorSpec<M, 
Void> {
   public WatermarkFunction getWatermarkFn() {
     return null;
   }
+
+  @Override
+  public TimerFunction getTimerFn() {
+    return null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/aa058655/samza-core/src/main/java/org/apache/samza/operators/spec/PartitionByOperatorSpec.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/spec/PartitionByOperatorSpec.java
 
b/samza-core/src/main/java/org/apache/samza/operators/spec/PartitionByOperatorSpec.java
index 399c836..a0a9b61 100644
--- 
a/samza-core/src/main/java/org/apache/samza/operators/spec/PartitionByOperatorSpec.java
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/spec/PartitionByOperatorSpec.java
@@ -19,6 +19,7 @@
 package org.apache.samza.operators.spec;
 
 import org.apache.samza.operators.KV;
+import org.apache.samza.operators.functions.TimerFunction;
 import org.apache.samza.operators.functions.WatermarkFunction;
 
 import java.util.function.Function;
@@ -78,4 +79,9 @@ public class PartitionByOperatorSpec<M, K, V> extends 
OperatorSpec<M, Void> {
   public WatermarkFunction getWatermarkFn() {
     return null;
   }
+
+  @Override
+  public TimerFunction getTimerFn() {
+    return null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/aa058655/samza-core/src/main/java/org/apache/samza/operators/spec/SendToTableOperatorSpec.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/spec/SendToTableOperatorSpec.java
 
b/samza-core/src/main/java/org/apache/samza/operators/spec/SendToTableOperatorSpec.java
index 9084be2..e1b51be 100644
--- 
a/samza-core/src/main/java/org/apache/samza/operators/spec/SendToTableOperatorSpec.java
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/spec/SendToTableOperatorSpec.java
@@ -20,6 +20,7 @@ package org.apache.samza.operators.spec;
 
 import org.apache.samza.annotation.InterfaceStability;
 import org.apache.samza.operators.KV;
+import org.apache.samza.operators.functions.TimerFunction;
 import org.apache.samza.operators.functions.WatermarkFunction;
 import org.apache.samza.table.TableSpec;
 
@@ -62,4 +63,9 @@ public class SendToTableOperatorSpec<K, V> extends 
OperatorSpec<KV<K, V>, Void>
   public WatermarkFunction getWatermarkFn() {
     return null;
   }
+
+  @Override
+  public TimerFunction getTimerFn() {
+    return null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/aa058655/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java
 
b/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java
index 1ca3801..aa0f066 100644
--- 
a/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java
@@ -19,6 +19,7 @@
 package org.apache.samza.operators.spec;
 
 import org.apache.samza.operators.functions.SinkFunction;
+import org.apache.samza.operators.functions.TimerFunction;
 import org.apache.samza.operators.functions.WatermarkFunction;
 
 
@@ -54,4 +55,9 @@ public class SinkOperatorSpec<M> extends OperatorSpec<M, 
Void> {
   public WatermarkFunction getWatermarkFn() {
     return sinkFn instanceof WatermarkFunction ? (WatermarkFunction) sinkFn : 
null;
   }
+
+  @Override
+  public TimerFunction getTimerFn() {
+    return sinkFn instanceof TimerFunction ? (TimerFunction) sinkFn : null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/aa058655/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java
 
b/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java
index b1e29c6..644eb6c 100644
--- 
a/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java
@@ -19,6 +19,7 @@
 package org.apache.samza.operators.spec;
 
 import org.apache.samza.operators.functions.FlatMapFunction;
+import org.apache.samza.operators.functions.TimerFunction;
 import org.apache.samza.operators.functions.WatermarkFunction;
 
 
@@ -31,17 +32,20 @@ import 
org.apache.samza.operators.functions.WatermarkFunction;
 public class StreamOperatorSpec<M, OM> extends OperatorSpec<M, OM> {
 
   private final FlatMapFunction<M, OM> transformFn;
+  private final Object originalFn;
 
   /**
    * Constructor for a {@link StreamOperatorSpec}.
    *
    * @param transformFn  the transformation function
+   * @param originalFn the original user function before wrapping to 
transformFn
    * @param opCode  the {@link OpCode} for this {@link StreamOperatorSpec}
    * @param opId  the unique ID for this {@link StreamOperatorSpec}
    */
-  StreamOperatorSpec(FlatMapFunction<M, OM> transformFn, OperatorSpec.OpCode 
opCode, String opId) {
+  StreamOperatorSpec(FlatMapFunction<M, OM> transformFn, Object originalFn, 
OperatorSpec.OpCode opCode, String opId) {
     super(opCode, opId);
     this.transformFn = transformFn;
+    this.originalFn = originalFn;
   }
 
   public FlatMapFunction<M, OM> getTransformFn() {
@@ -50,6 +54,11 @@ public class StreamOperatorSpec<M, OM> extends 
OperatorSpec<M, OM> {
 
   @Override
   public WatermarkFunction getWatermarkFn() {
-    return transformFn instanceof WatermarkFunction ? (WatermarkFunction) 
transformFn : null;
+    return originalFn instanceof WatermarkFunction ? (WatermarkFunction) 
originalFn : null;
+  }
+
+  @Override
+  public TimerFunction getTimerFn() {
+    return originalFn instanceof TimerFunction ? (TimerFunction) originalFn : 
null;
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/aa058655/samza-core/src/main/java/org/apache/samza/operators/spec/StreamTableJoinOperatorSpec.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/spec/StreamTableJoinOperatorSpec.java
 
b/samza-core/src/main/java/org/apache/samza/operators/spec/StreamTableJoinOperatorSpec.java
index 730913a..c7735c6 100644
--- 
a/samza-core/src/main/java/org/apache/samza/operators/spec/StreamTableJoinOperatorSpec.java
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/spec/StreamTableJoinOperatorSpec.java
@@ -20,6 +20,7 @@ package org.apache.samza.operators.spec;
 
 import org.apache.samza.annotation.InterfaceStability;
 import org.apache.samza.operators.functions.StreamTableJoinFunction;
+import org.apache.samza.operators.functions.TimerFunction;
 import org.apache.samza.operators.functions.WatermarkFunction;
 import org.apache.samza.table.TableSpec;
 
@@ -64,4 +65,9 @@ public class StreamTableJoinOperatorSpec<K, M, R, JM> extends 
OperatorSpec<M, JM
     return joinFn instanceof WatermarkFunction ? (WatermarkFunction) joinFn : 
null;
   }
 
+  @Override
+  public TimerFunction getTimerFn() {
+    return joinFn instanceof TimerFunction ? (TimerFunction) joinFn : null;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/aa058655/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
 
b/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
index 06a4f4b..1c8e592 100644
--- 
a/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
@@ -20,6 +20,7 @@
 package org.apache.samza.operators.spec;
 
 import org.apache.samza.operators.functions.FoldLeftFunction;
+import org.apache.samza.operators.functions.TimerFunction;
 import org.apache.samza.operators.functions.WatermarkFunction;
 import org.apache.samza.operators.impl.store.TimeSeriesKeySerde;
 import org.apache.samza.operators.triggers.AnyTrigger;
@@ -123,6 +124,12 @@ public class WindowOperatorSpec<M, WK, WV> extends 
OperatorSpec<M, WindowPane<WK
   }
 
   @Override
+  public TimerFunction getTimerFn() {
+    FoldLeftFunction fn = window.getFoldLeftFunction();
+    return fn instanceof TimerFunction ? (TimerFunction) fn : null;
+  }
+
+  @Override
   public Collection<StoreDescriptor> getStoreDescriptors() {
     String storeName = getOpId();
     String storeFactory = 
"org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory";

http://git-wip-us.apache.org/repos/asf/samza/blob/aa058655/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java 
b/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java
index b8f48c7..f4b1d41 100644
--- a/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java
+++ b/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java
@@ -304,6 +304,7 @@ public class AsyncRunLoop implements Runnable, Throttleable 
{
     COMMIT,
     PROCESS,
     END_OF_STREAM,
+    TIMER,
     NO_OP
   }
 
@@ -346,6 +347,13 @@ public class AsyncRunLoop implements Runnable, 
Throttleable {
           }
         }, commitMs, commitMs, TimeUnit.MILLISECONDS);
       }
+
+      final SystemTimerScheduler timerFactory = 
task.context().getTimerScheduler();
+      if (timerFactory != null) {
+        timerFactory.registerListener(() -> {
+            state.needTimer();
+          });
+      }
     }
 
     /**
@@ -375,6 +383,9 @@ public class AsyncRunLoop implements Runnable, Throttleable 
{
         case WINDOW:
           window();
           break;
+        case TIMER:
+          timer();
+          break;
         case COMMIT:
           commit();
           break;
@@ -514,6 +525,39 @@ public class AsyncRunLoop implements Runnable, 
Throttleable {
       }
     }
 
+    private void timer() {
+      state.startTimer();
+      Runnable timerWorker = new Runnable() {
+        @Override
+        public void run() {
+          try {
+            ReadableCoordinator coordinator = new 
ReadableCoordinator(task.taskName());
+
+            long startTime = clock.nanoTime();
+            task.timer(coordinator);
+            containerMetrics.timerNs().update(clock.nanoTime() - startTime);
+
+            coordinatorRequests.update(coordinator);
+            state.doneTimer();
+          } catch (Throwable t) {
+            log.error("Task {} timer failed", task.taskName(), t);
+            abort(t);
+          } finally {
+            log.trace("Task {} timer completed", task.taskName());
+            resume();
+          }
+        }
+      };
+
+      if (threadPool != null) {
+        log.trace("Task {} timer runs on the thread pool", task.taskName());
+        threadPool.submit(timerWorker);
+      } else {
+        log.trace("Task {} timer runs on the run loop thread", 
task.taskName());
+        timerWorker.run();
+      }
+    }
+
     /**
      * Task process completes successfully, update the offsets based on the 
high-water mark.
      * Then it will trigger the listener for task state change.
@@ -585,10 +629,12 @@ public class AsyncRunLoop implements Runnable, 
Throttleable {
   private final class AsyncTaskState {
     private volatile boolean needWindow = false;
     private volatile boolean needCommit = false;
+    private volatile boolean needTimer = false;
     private volatile boolean complete = false;
     private volatile boolean endOfStream = false;
     private volatile boolean windowInFlight = false;
     private volatile boolean commitInFlight = false;
+    private volatile boolean timerInFlight = false;
     private final AtomicInteger messagesInFlight = new AtomicInteger(0);
     private final ArrayDeque<PendingEnvelope> pendingEnvelopeQueue;
 
@@ -634,29 +680,28 @@ public class AsyncRunLoop implements Runnable, 
Throttleable {
         needCommit = true;
       }
 
-      boolean windowOrCommitInFlight = windowInFlight || commitInFlight;
+      boolean opInFlight = windowInFlight || commitInFlight || timerInFlight;
       /*
        * A task is ready to commit, when task.commit(needCommit) is requested 
either by user or commit thread
        * and either of the following conditions are true.
-       * a) When process, window, commit are not in progress.
+       * a) When process, window, commit and timer are not in progress.
        * b) When task.async.commit is true and window, commit are not in 
progress.
        */
       if (needCommit) {
-        return (messagesInFlight.get() == 0 || isAsyncCommitEnabled) && 
!windowOrCommitInFlight;
-      } else if (needWindow || endOfStream) {
+        return (messagesInFlight.get() == 0 || isAsyncCommitEnabled) && 
!opInFlight;
+      } else if (needWindow || needTimer || endOfStream) {
         /*
-         * A task is ready for window operation, when task.window(needWindow) 
is requested by either user or window thread
-         * and window, commit are not in progress.
+         * A task is ready for window, timer or end-of-stream operation.
          */
-        return messagesInFlight.get() == 0 && !windowOrCommitInFlight;
+        return messagesInFlight.get() == 0 && !opInFlight;
       } else {
         /*
          * A task is ready to process new message, when number of task.process 
calls in progress < task.max.concurrency
          * and either of the following conditions are true.
-         * a) When window, commit are not in progress.
-         * b) When task.async.commit is true and window is not in progress.
+         * a) When window, commit and timer are not in progress.
+         * b) When task.async.commit is true and window and timer are not in 
progress.
          */
-        return messagesInFlight.get() < maxConcurrency && !windowInFlight && 
(isAsyncCommitEnabled || !commitInFlight);
+        return messagesInFlight.get() < maxConcurrency && !windowInFlight && 
!timerInFlight && (isAsyncCommitEnabled || !commitInFlight);
       }
     }
 
@@ -670,6 +715,7 @@ public class AsyncRunLoop implements Runnable, Throttleable 
{
       if (isReady()) {
         if (needCommit) return WorkerOp.COMMIT;
         else if (needWindow) return WorkerOp.WINDOW;
+        else if (needTimer) return WorkerOp.TIMER;
         else if (endOfStream && pendingEnvelopeQueue.isEmpty()) return 
WorkerOp.END_OF_STREAM;
         else if (!pendingEnvelopeQueue.isEmpty()) return WorkerOp.PROCESS;
       }
@@ -684,6 +730,10 @@ public class AsyncRunLoop implements Runnable, 
Throttleable {
       needCommit = true;
     }
 
+    private void needTimer() {
+      needTimer = true;
+    }
+
     private void startWindow() {
       needWindow = false;
       windowInFlight = true;
@@ -699,6 +749,11 @@ public class AsyncRunLoop implements Runnable, 
Throttleable {
       taskMetrics.messagesInFlight().set(count);
     }
 
+    private void startTimer() {
+      needTimer = false;
+      timerInFlight = true;
+    }
+
     private void doneCommit() {
       commitInFlight = false;
     }
@@ -712,6 +767,10 @@ public class AsyncRunLoop implements Runnable, 
Throttleable {
       taskMetrics.messagesInFlight().set(count);
     }
 
+    private void doneTimer() {
+      timerInFlight = false;
+    }
+
     /**
      * Insert an PendingEnvelope into the pending envelope queue.
      * The function will be called in the run loop thread so no 
synchronization.

http://git-wip-us.apache.org/repos/asf/samza/blob/aa058655/samza-core/src/main/java/org/apache/samza/task/SystemTimerScheduler.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/task/SystemTimerScheduler.java 
b/samza-core/src/main/java/org/apache/samza/task/SystemTimerScheduler.java
new file mode 100644
index 0000000..4589058
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/task/SystemTimerScheduler.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.task;
+
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import static com.google.common.base.Preconditions.checkState;
+
+/**
+ * Per-task scheduler for keyed timers.
+ * It does the following things:
+ * 1) schedules the timer on the {@link ScheduledExecutorService}.
+ * 2) keeps track of the timers created and timers that are ready.
+ * 3) triggers listener whenever a timer fires.
+ */
+public class SystemTimerScheduler {
+
+  /**
+   * For run loop to listen to timer firing so it can schedule the callbacks.
+   */
+  public interface TimerListener {
+    void onTimer();
+  }
+
+  private final ScheduledExecutorService executor;
+  private final Map<Object, ScheduledFuture> scheduledFutures = new 
ConcurrentHashMap<>();
+  private final Map<TimerKey<?>, TimerCallback> readyTimers = new 
ConcurrentHashMap<>();
+  private TimerListener timerListener;
+
+  public static SystemTimerScheduler create(ScheduledExecutorService executor) 
{
+    return new SystemTimerScheduler(executor);
+  }
+
+  private SystemTimerScheduler(ScheduledExecutorService executor) {
+    this.executor = executor;
+  }
+
+  public <K> void setTimer(K key, long timestamp, TimerCallback<K> callback) {
+    checkState(!scheduledFutures.containsKey(key),
+        String.format("Duplicate key %s registration for the same timer", 
key));
+
+    final long delay = timestamp - System.currentTimeMillis();
+    final ScheduledFuture<?> scheduledFuture = executor.schedule(() -> {
+        readyTimers.put(TimerKey.of(key, timestamp), callback);
+
+        if (timerListener != null) {
+          timerListener.onTimer();
+        }
+      }, delay > 0 ? delay : 0, TimeUnit.MILLISECONDS);
+    scheduledFutures.put(key, scheduledFuture);
+  }
+
+  public <K> void deleteTimer(K key) {
+    final ScheduledFuture<?> scheduledFuture = scheduledFutures.remove(key);
+    if (scheduledFuture != null) {
+      scheduledFuture.cancel(false);
+    }
+  }
+
+  void registerListener(TimerListener listener) {
+    timerListener = listener;
+  }
+
+  public Map<TimerKey<?>, TimerCallback> removeReadyTimers() {
+    final Map<TimerKey<?>, TimerCallback> timers = new TreeMap<>(readyTimers);
+    readyTimers.keySet().removeAll(timers.keySet());
+    return timers;
+  }
+
+  public static class TimerKey<K> implements Comparable<TimerKey<K>> {
+    private final K key;
+    private final long time;
+
+    static <K> TimerKey<K> of(K key, long time) {
+      return new TimerKey<>(key, time);
+    }
+
+    private TimerKey(K key, long time) {
+      this.key = key;
+      this.time = time;
+    }
+
+    public K getKey() {
+      return key;
+    }
+
+    public long getTime() {
+      return time;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+
+      TimerKey<?> timerKey = (TimerKey<?>) o;
+      if (time != ((TimerKey<?>) o).time) {
+        return false;
+      }
+      return key.equals(timerKey.key);
+    }
+
+    @Override
+    public int hashCode() {
+      int result = key.hashCode();
+      result = 31 * result + Long.valueOf(time).hashCode();
+      return result;
+    }
+
+    @Override
+    public String toString() {
+      return "TimerKey{"
+          + "key=" + key
+          + ", time='" + time + '\''
+          + '}';
+    }
+
+    @Override
+    public int compareTo(TimerKey<K> o) {
+      final int timeCompare = Long.compare(time, o.time);
+      if (timeCompare != 0) {
+        return timeCompare;
+      }
+
+      return key.hashCode() - o.key.hashCode();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/aa058655/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index bef5b41..789d75b 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -24,7 +24,7 @@ import java.net.{URL, UnknownHostException}
 import java.nio.file.Path
 import java.util
 import java.util.Base64
-import java.util.concurrent.{ExecutorService, Executors, TimeUnit}
+import java.util.concurrent.{ScheduledExecutorService, ExecutorService, 
Executors, TimeUnit}
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder
 import org.apache.samza.checkpoint.{CheckpointListener, 
CheckpointManagerFactory, OffsetManager, OffsetManagerMetrics}
@@ -436,6 +436,8 @@ object SamzaContainer extends Logging {
 
     val storeWatchPaths = new util.HashSet[Path]()
 
+    val timerExecutor = Executors.newSingleThreadScheduledExecutor
+
     val taskInstances: Map[TaskName, TaskInstance] = 
containerModel.getTasks.values.asScala.map(taskModel => {
       debug("Setting up task instance: %s" format taskModel)
 
@@ -563,7 +565,8 @@ object SamzaContainer extends Logging {
           systemStreamPartitions = systemStreamPartitions,
           exceptionHandler = TaskInstanceExceptionHandler(taskInstanceMetrics, 
config),
           jobModel = jobModel,
-          streamMetadataCache = streamMetadataCache)
+          streamMetadataCache = streamMetadataCache,
+          timerExecutor = timerExecutor)
 
       val taskInstance = createTaskInstance(task)
 
@@ -636,7 +639,8 @@ object SamzaContainer extends Logging {
       jvm = jvm,
       diskSpaceMonitor = diskSpaceMonitor,
       hostStatisticsMonitor = memoryStatisticsMonitor,
-      taskThreadPool = taskThreadPool)
+      taskThreadPool = taskThreadPool,
+      timerExecutor = timerExecutor)
   }
 }
 
@@ -656,7 +660,8 @@ class SamzaContainer(
   securityManager: SecurityManager = null,
   reporters: Map[String, MetricsReporter] = Map(),
   jvm: JvmMetrics = null,
-  taskThreadPool: ExecutorService = null) extends Runnable with Logging {
+  taskThreadPool: ExecutorService = null,
+  timerExecutor: ScheduledExecutorService = 
Executors.newSingleThreadScheduledExecutor) extends Runnable with Logging {
 
   val shutdownMs = 
containerContext.config.getShutdownMs.getOrElse(TaskConfigJava.DEFAULT_TASK_SHUTDOWN_MS)
   var shutdownHookThread: Thread = null
@@ -1003,6 +1008,18 @@ class SamzaContainer(
       }
     }
 
+    if (timerExecutor != null) {
+      info("Shutting down timer executor")
+      try {
+        timerExecutor.shutdown()
+        if (timerExecutor.awaitTermination(shutdownMs, TimeUnit.MILLISECONDS)) 
{
+          timerExecutor.shutdownNow()
+        }
+      } catch {
+        case e: Exception => error("Ignoring exception shutting down timer 
executor", e)
+      }
+    }
+
     if (isAutoCommitEnabled) {
       info("Committing offsets for all task instances")
       taskInstances.values.foreach(_.commit)

http://git-wip-us.apache.org/repos/asf/samza/blob/aa058655/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
 
b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
index d080939..c122956 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
+++ 
b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
@@ -29,11 +29,13 @@ class SamzaContainerMetrics(
 
   val commits = newCounter("commit-calls")
   val windows = newCounter("window-calls")
+  val timers = newCounter("timer-calls")
   val processes = newCounter("process-calls")
   val envelopes = newCounter("process-envelopes")
   val nullEnvelopes = newCounter("process-null-envelopes")
   val chooseNs = newTimer("choose-ns")
   val windowNs = newTimer("window-ns")
+  val timerNs = newTimer("timer-ns")
   val processNs = newTimer("process-ns")
   val commitNs = newTimer("commit-ns")
   val blockNs = newTimer("block-ns")

http://git-wip-us.apache.org/repos/asf/samza/blob/aa058655/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala 
b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
index c7d76c2..cb73c5d 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
@@ -20,12 +20,15 @@
 package org.apache.samza.container
 
 
+import java.util.concurrent.ScheduledExecutorService
+
 import org.apache.samza.SamzaException
 import org.apache.samza.checkpoint.OffsetManager
 import org.apache.samza.config.Config
 import org.apache.samza.config.StreamConfig.Config2Stream
 import org.apache.samza.job.model.JobModel
 import org.apache.samza.metrics.MetricsReporter
+import org.apache.samza.operators.functions.TimerFunction
 import org.apache.samza.storage.TaskStorageManager
 import org.apache.samza.system._
 import org.apache.samza.table.TableManager
@@ -33,6 +36,7 @@ import org.apache.samza.task._
 import org.apache.samza.util.Logging
 
 import scala.collection.JavaConverters._
+import scala.collection.JavaConversions._
 
 class TaskInstance(
   val task: Any,
@@ -50,7 +54,8 @@ class TaskInstance(
   val systemStreamPartitions: Set[SystemStreamPartition] = Set(),
   val exceptionHandler: TaskInstanceExceptionHandler = new 
TaskInstanceExceptionHandler,
   jobModel: JobModel = null,
-  streamMetadataCache: StreamMetadataCache = null) extends Logging {
+  streamMetadataCache: StreamMetadataCache = null,
+  timerExecutor : ScheduledExecutorService = null) extends Logging {
   val isInitableTask = task.isInstanceOf[InitableTask]
   val isWindowableTask = task.isInstanceOf[WindowableTask]
   val isEndOfStreamListenerTask = task.isInstanceOf[EndOfStreamListenerTask]
@@ -58,7 +63,7 @@ class TaskInstance(
   val isAsyncTask = task.isInstanceOf[AsyncStreamTask]
 
   val context = new TaskContextImpl(taskName, metrics, containerContext, 
systemStreamPartitions.asJava, offsetManager,
-                                    storageManager, tableManager, jobModel, 
streamMetadataCache)
+                                    storageManager, tableManager, jobModel, 
streamMetadataCache, timerExecutor)
 
   // store the (ssp -> if this ssp is catched up) mapping. "catched up"
   // means the same ssp in other taskInstances have the same offset as
@@ -185,6 +190,16 @@ class TaskInstance(
     }
   }
 
+  def timer(coordinator: ReadableCoordinator) {
+    trace("Timer for taskName: %s" format taskName)
+
+    exceptionHandler.maybeHandle {
+      context.getTimerScheduler.removeReadyTimers().entrySet().foreach { entry 
=>
+        
entry.getValue.asInstanceOf[TimerCallback[Any]].onTimer(entry.getKey.getKey, 
collector, coordinator)
+      }
+    }
+  }
+
   def commit {
     metrics.commits.inc
 

http://git-wip-us.apache.org/repos/asf/samza/blob/aa058655/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
 
b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
index a4f0354..249ff09 100644
--- 
a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
+++ 
b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
@@ -28,6 +28,7 @@ import org.apache.samza.metrics.Counter;
 import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.metrics.ReadableMetricsRegistry;
 import org.apache.samza.metrics.Timer;
+import org.apache.samza.operators.functions.TimerFunction;
 import org.apache.samza.operators.functions.WatermarkFunction;
 import org.apache.samza.operators.spec.OperatorSpec;
 import org.apache.samza.task.MessageCollector;
@@ -217,6 +218,11 @@ public class TestOperatorImpl {
     public WatermarkFunction getWatermarkFn() {
       return null;
     }
+
+    @Override
+    public TimerFunction getTimerFn() {
+      return null;
+    }
   }
 
   public static Set<OperatorImpl> getNextOperators(OperatorImpl op) {

http://git-wip-us.apache.org/repos/asf/samza/blob/aa058655/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
 
b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
index 1448f79..2d8d1eb 100644
--- 
a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
+++ 
b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
@@ -220,7 +220,7 @@ public class TestOperatorImplGraph {
         new OperatorImplGraph(streamGraph, mock(Config.class), 
mockTaskContext, mock(Clock.class));
 
     // verify that the DAG after merge is only traversed & initialized once
-    verify(mockMapFunction, times(1)).init(any(Config.class), 
any(TaskContextImpl.class));
+    verify(mockMapFunction, times(1)).init(any(Config.class), 
any(TaskContext.class));
   }
 
   @Test
@@ -249,7 +249,7 @@ public class TestOperatorImplGraph {
         new OperatorImplGraph(streamGraph, mockConfig, mockTaskContext, 
mock(Clock.class));
 
     // verify that join function is initialized once.
-    verify(mockJoinFunction, times(1)).init(any(Config.class), 
any(TaskContextImpl.class));
+    verify(mockJoinFunction, times(1)).init(any(Config.class), 
any(TaskContext.class));
 
     InputOperatorImpl inputOpImpl1 = opImplGraph.getInputOperator(new 
SystemStream("input-system", "input-stream1"));
     InputOperatorImpl inputOpImpl2 = opImplGraph.getInputOperator(new 
SystemStream("input-system", "input-stream2"));

http://git-wip-us.apache.org/repos/asf/samza/blob/aa058655/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java 
b/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
index 28a4f8b..7f54614 100644
--- a/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
+++ b/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
@@ -86,7 +86,8 @@ public class TestAsyncRunLoop {
     scala.collection.immutable.Set<SystemStreamPartition> sspSet = 
JavaConverters.asScalaSetConverter(Collections.singleton(ssp)).asScala().toSet();
     return new TaskInstance(task, taskName, mock(Config.class), 
taskInstanceMetrics,
         null, consumers, mock(TaskInstanceCollector.class), 
mock(SamzaContainerContext.class),
-        manager, null, null, null, sspSet, new 
TaskInstanceExceptionHandler(taskInstanceMetrics, new 
scala.collection.immutable.HashSet<String>()), null, null);
+        manager, null, null, null, sspSet, new 
TaskInstanceExceptionHandler(taskInstanceMetrics,
+        new scala.collection.immutable.HashSet<String>()), null, null, null);
   }
 
   TaskInstance createTaskInstance(AsyncStreamTask task, TaskName taskName, 
SystemStreamPartition ssp) {

http://git-wip-us.apache.org/repos/asf/samza/blob/aa058655/samza-core/src/test/java/org/apache/samza/task/TestSystemTimerScheduler.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/task/TestSystemTimerScheduler.java 
b/samza-core/src/test/java/org/apache/samza/task/TestSystemTimerScheduler.java
new file mode 100644
index 0000000..dd08121
--- /dev/null
+++ 
b/samza-core/src/test/java/org/apache/samza/task/TestSystemTimerScheduler.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.task;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class TestSystemTimerScheduler {
+
+  private ScheduledExecutorService createExecutorService() {
+    ScheduledExecutorService service = mock(ScheduledExecutorService.class);
+    when(service.schedule((Runnable) anyObject(), anyLong(), 
anyObject())).thenAnswer(invocation -> {
+        Object[] args = invocation.getArguments();
+        Runnable runnable = (Runnable) args[0];
+        runnable.run();
+        return mock(ScheduledFuture.class);
+      });
+    return service;
+  }
+
+  private void fireTimers(SystemTimerScheduler factory) {
+    factory.removeReadyTimers().entrySet().forEach(entry -> {
+        entry.getValue().onTimer(entry.getKey().getKey(), 
mock(MessageCollector.class), mock(TaskCoordinator.class));
+      });
+  }
+
+  @Test
+  public void testSingleTimer() {
+    SystemTimerScheduler scheduler = 
SystemTimerScheduler.create(createExecutorService());
+    List<String> results = new ArrayList<>();
+    scheduler.setTimer("single-timer", 1, (key, collector, coordinator) -> {
+        results.add(key);
+      });
+
+    fireTimers(scheduler);
+
+    assertTrue(results.size() == 1);
+    assertEquals(results.get(0), "single-timer");
+  }
+
+  @Test
+  public void testMultipleTimers() {
+    SystemTimerScheduler scheduler = 
SystemTimerScheduler.create(createExecutorService());
+    List<String> results = new ArrayList<>();
+    scheduler.setTimer("multiple-timer-3", 3, (key, collector, coordinator) -> 
{
+        results.add(key + ":3");
+      });
+    scheduler.setTimer("multiple-timer-2", 2, (key, collector, coordinator) -> 
{
+        results.add(key + ":2");
+      });
+    scheduler.setTimer("multiple-timer-1", 1, (key, collector, coordinator) -> 
{
+        results.add(key + ":1");
+      });
+
+    fireTimers(scheduler);
+
+    assertTrue(results.size() == 3);
+    assertEquals(results.get(0), "multiple-timer-1:1");
+    assertEquals(results.get(1), "multiple-timer-2:2");
+    assertEquals(results.get(2), "multiple-timer-3:3");
+  }
+
+  @Test
+  public void testMultipleKeys() {
+    Object key1 = new Object();
+    Object key2 = new Object();
+    List<String> results = new ArrayList<>();
+
+    SystemTimerScheduler scheduler = 
SystemTimerScheduler.create(createExecutorService());
+    scheduler.setTimer(key1, 2, (key, collector, coordinator) -> {
+        assertEquals(key, key1);
+        results.add("key1:2");
+      });
+    scheduler.setTimer(key2, 1, (key, collector, coordinator) -> {
+        assertEquals(key, key2);
+        results.add("key2:1");
+      });
+
+    fireTimers(scheduler);
+
+    assertTrue(results.size() == 2);
+    assertEquals(results.get(0), "key2:1");
+    assertEquals(results.get(1), "key1:2");
+  }
+
+  @Test
+  public void testMultipleKeyTypes() {
+    String key1 = "key";
+    Long key2 = Long.MAX_VALUE;
+    List<String> results = new ArrayList<>();
+
+    SystemTimerScheduler scheduler = 
SystemTimerScheduler.create(createExecutorService());
+    scheduler.setTimer(key1, 1, (key, collector, coordinator) -> {
+        assertEquals(key, key1);
+        results.add("key:1");
+      });
+    scheduler.setTimer(key2, 2, (key, collector, coordinator) -> {
+        assertEquals(key.longValue(), Long.MAX_VALUE);
+        results.add(Long.MAX_VALUE + ":2");
+      });
+
+    fireTimers(scheduler);
+
+    assertTrue(results.size() == 2);
+    assertEquals(results.get(0), key1 + ":1");
+    assertEquals(results.get(1), key2 + ":2");
+  }
+
+  @Test
+  public void testRemoveTimer() {
+    ScheduledExecutorService service = mock(ScheduledExecutorService.class);
+    ScheduledFuture future = mock(ScheduledFuture.class);
+    when(future.cancel(anyBoolean())).thenReturn(true);
+    when(service.schedule((Runnable) anyObject(), anyLong(), 
anyObject())).thenAnswer(invocation -> future);
+
+    SystemTimerScheduler scheduler = SystemTimerScheduler.create(service);
+    List<String> results = new ArrayList<>();
+    scheduler.setTimer("timer", 1, (key, collector, coordinator) -> {
+        results.add(key);
+      });
+
+    scheduler.deleteTimer("timer");
+
+    fireTimers(scheduler);
+
+    assertTrue(results.isEmpty());
+    verify(future, times(1)).cancel(anyBoolean());
+  }
+
+  @Test
+  public void testTimerListener() {
+    SystemTimerScheduler scheduler = 
SystemTimerScheduler.create(createExecutorService());
+    List<String> results = new ArrayList<>();
+    scheduler.registerListener(() -> {
+        results.add("timer-listener");
+      });
+
+    scheduler.setTimer("timer-listener", 1, (key, collector, coordinator) -> {
+      });
+
+    fireTimers(scheduler);
+
+    assertTrue(results.size() == 1);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/aa058655/samza-test/src/test/java/org/apache/samza/test/timer/TestTimerApp.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/timer/TestTimerApp.java 
b/samza-test/src/test/java/org/apache/samza/test/timer/TestTimerApp.java
new file mode 100644
index 0000000..d8913c5
--- /dev/null
+++ b/samza-test/src/test/java/org/apache/samza/test/timer/TestTimerApp.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.test.timer;
+
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.TimerRegistry;
+import org.apache.samza.operators.functions.FlatMapFunction;
+import org.apache.samza.operators.functions.TimerFunction;
+import org.apache.samza.serializers.JsonSerdeV2;
+import org.apache.samza.test.operator.data.PageView;
+import org.apache.samza.test.util.StreamAssert;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+public class TestTimerApp implements StreamApplication {
+  public static final String PAGE_VIEWS = "page-views";
+
+  @Override
+  public void init(StreamGraph graph, Config config) {
+    final JsonSerdeV2<PageView> serde = new JsonSerdeV2<>(PageView.class);
+    final MessageStream<PageView> pageViews = graph.getInputStream(PAGE_VIEWS, 
serde);
+    final MessageStream<PageView> output = pageViews.flatMap(new 
FlatmapTimerFn());
+
+    StreamAssert.that("Output from timer function should container all 
complete messages", output, serde)
+        .containsInAnyOrder(
+            Arrays.asList(
+                new PageView("v1-complete", "p1", "u1"),
+                new PageView("v2-complete", "p2", "u1"),
+                new PageView("v3-complete", "p1", "u2"),
+                new PageView("v4-complete", "p3", "u2")
+            ));
+  }
+
+  private static class FlatmapTimerFn implements FlatMapFunction<PageView, 
PageView>, TimerFunction<String, PageView> {
+
+    private List<PageView> pageViews = new ArrayList<>();
+    private TimerRegistry<String> timerRegistry;
+
+    @Override
+    public void registerTimer(TimerRegistry<String> timerRegistry) {
+      this.timerRegistry = timerRegistry;
+    }
+
+    @Override
+    public Collection<PageView> apply(PageView message) {
+      final PageView pv = new PageView(message.getViewId() + "-complete", 
message.getPageId(), message.getUserId());
+      pageViews.add(pv);
+
+      if (pageViews.size() == 2) {
+        //got all messages for this task
+        final long time = System.currentTimeMillis() + 100;
+        timerRegistry.register("CompleteTimer", time);
+      }
+      return Collections.emptyList();
+    }
+
+    @Override
+    public Collection<PageView> onTimer(String key, long time) {
+      return pageViews;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/aa058655/samza-test/src/test/java/org/apache/samza/test/timer/TimerTest.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/timer/TimerTest.java 
b/samza-test/src/test/java/org/apache/samza/test/timer/TimerTest.java
new file mode 100644
index 0000000..11b3aeb
--- /dev/null
+++ b/samza-test/src/test/java/org/apache/samza/test/timer/TimerTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.test.timer;
+
+import org.apache.samza.test.operator.StreamApplicationIntegrationTestHarness;
+import org.junit.Before;
+import org.junit.Test;
+
+
+import static org.apache.samza.test.timer.TestTimerApp.PAGE_VIEWS;
+
+public class TimerTest extends StreamApplicationIntegrationTestHarness {
+
+  @Before
+  public void setup() {
+    // create topics
+    createTopic(PAGE_VIEWS, 2);
+
+    // create events for the following user activity.
+    // userId: (viewId, pageId, (adIds))
+    // u1: (v1, p1, (a1)), (v2, p2, (a3))
+    // u2: (v3, p1, (a1)), (v4, p3, (a5))
+    produceMessage(PAGE_VIEWS, 0, "p1", 
"{\"viewId\":\"v1\",\"pageId\":\"p1\",\"userId\":\"u1\"}");
+    produceMessage(PAGE_VIEWS, 1, "p2", 
"{\"viewId\":\"v2\",\"pageId\":\"p2\",\"userId\":\"u1\"}");
+    produceMessage(PAGE_VIEWS, 0, "p1", 
"{\"viewId\":\"v3\",\"pageId\":\"p1\",\"userId\":\"u2\"}");
+    produceMessage(PAGE_VIEWS, 1, "p3", 
"{\"viewId\":\"v4\",\"pageId\":\"p3\",\"userId\":\"u2\"}");
+
+  }
+
+  @Test
+  public void testJob() {
+    runApplication(new TestTimerApp(), "TimerTest", null);
+  }
+}

Reply via email to