Repository: samza Updated Branches: refs/heads/master 78ee98261 -> aa0586558
SAMZA-1498: Support arbitrary system clock timer in operators This patch adds the capability to register arbitrary timers for both high-level and low-level api. For high-level, InitableFunction will pass the TimerRegistry to user through the new OpContext, and user will implement the TimerFunction to get timer notifications. For low-level api, user can register timer in the TaskContext, and then implements the TimerCallback for specific timer actions. Author: xiliu <xi...@linkedin.com> Author: xinyuiscool <xinyuliu...@gmail.com> Author: xinyuiscool <xi...@linkedin.com> Reviewers: Pateek M <prate...@gmail.com> Closes #419 from xinyuiscool/SAMZA-1498 Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/aa058655 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/aa058655 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/aa058655 Branch: refs/heads/master Commit: aa058655890735274f679f009db2a64f87ec3c04 Parents: 78ee982 Author: xiliu <xi...@linkedin.com> Authored: Tue Mar 6 18:20:15 2018 -0800 Committer: xiliu <xi...@linkedin.com> Committed: Tue Mar 6 18:20:15 2018 -0800 ---------------------------------------------------------------------- .../apache/samza/operators/TimerRegistry.java | 41 +++++ .../operators/functions/InitableFunction.java | 3 +- .../operators/functions/TimerFunction.java | 65 +++++++ .../java/org/apache/samza/task/TaskContext.java | 19 ++ .../org/apache/samza/task/TimerCallback.java | 34 ++++ .../apache/samza/container/TaskContextImpl.java | 31 +++- .../samza/operators/impl/OperatorImpl.java | 42 ++++- .../samza/operators/impl/OperatorImplGraph.java | 31 ++-- .../operators/spec/BroadcastOperatorSpec.java | 6 + .../samza/operators/spec/InputOperatorSpec.java | 8 +- .../samza/operators/spec/JoinOperatorSpec.java | 6 + .../samza/operators/spec/OperatorSpec.java | 3 + .../samza/operators/spec/OperatorSpecs.java | 8 +- .../operators/spec/OutputOperatorSpec.java | 6 + .../operators/spec/PartitionByOperatorSpec.java | 6 + .../operators/spec/SendToTableOperatorSpec.java | 6 + .../samza/operators/spec/SinkOperatorSpec.java | 6 + .../operators/spec/StreamOperatorSpec.java | 13 +- .../spec/StreamTableJoinOperatorSpec.java | 6 + .../operators/spec/WindowOperatorSpec.java | 7 + .../org/apache/samza/task/AsyncRunLoop.java | 79 +++++++-- .../apache/samza/task/SystemTimerScheduler.java | 154 ++++++++++++++++ .../apache/samza/container/SamzaContainer.scala | 25 ++- .../samza/container/SamzaContainerMetrics.scala | 2 + .../apache/samza/container/TaskInstance.scala | 19 +- .../samza/operators/impl/TestOperatorImpl.java | 6 + .../operators/impl/TestOperatorImplGraph.java | 4 +- .../org/apache/samza/task/TestAsyncRunLoop.java | 3 +- .../samza/task/TestSystemTimerScheduler.java | 176 +++++++++++++++++++ .../apache/samza/test/timer/TestTimerApp.java | 86 +++++++++ .../org/apache/samza/test/timer/TimerTest.java | 51 ++++++ 31 files changed, 904 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/aa058655/samza-api/src/main/java/org/apache/samza/operators/TimerRegistry.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/TimerRegistry.java b/samza-api/src/main/java/org/apache/samza/operators/TimerRegistry.java new file mode 100644 index 0000000..64dd4ec --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/operators/TimerRegistry.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.operators; + +/** + * Allows registering epoch-time timer callbacks from the operators. + * See {@link org.apache.samza.operators.functions.TimerFunction} for details. + * @param <K> type of the timer key + */ +public interface TimerRegistry<K> { + + /** + * Register a epoch-time timer with key. + * @param key unique timer key + * @param timestamp epoch time when the timer will be fired, in milliseconds + */ + void register(K key, long timestamp); + + /** + * Delete the timer for the provided key. + * @param key key for the timer to delete + */ + void delete(K key); +} http://git-wip-us.apache.org/repos/asf/samza/blob/aa058655/samza-api/src/main/java/org/apache/samza/operators/functions/InitableFunction.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/InitableFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/InitableFunction.java index b08c6cd..6651819 100644 --- a/samza-api/src/main/java/org/apache/samza/operators/functions/InitableFunction.java +++ b/samza-api/src/main/java/org/apache/samza/operators/functions/InitableFunction.java @@ -16,13 +16,13 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.samza.operators.functions; import org.apache.samza.annotation.InterfaceStability; import org.apache.samza.config.Config; import org.apache.samza.task.TaskContext; - /** * A function that can be initialized before execution. * @@ -41,5 +41,4 @@ public interface InitableFunction { * @param context the {@link TaskContext} for this task */ default void init(Config config, TaskContext context) { } - } http://git-wip-us.apache.org/repos/asf/samza/blob/aa058655/samza-api/src/main/java/org/apache/samza/operators/functions/TimerFunction.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/TimerFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/TimerFunction.java new file mode 100644 index 0000000..01825c6 --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/operators/functions/TimerFunction.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.operators.functions; + +import org.apache.samza.operators.TimerRegistry; + +import java.util.Collection; + +/** + * Allows timer registration with a key and is invoked when the timer is fired. + * Key must be a unique identifier for this timer, and is provided in the callback when the timer fires. + * + * <p> + * Example of a {@link FlatMapFunction} with timer: + * <pre>{@code + * public class ExampleTimerFn implements FlatMapFunction<String, String>, TimerFunction<String, String> { + * public void registerTimer(TimerRegistry timerRegistry) { + * long time = System.currentTimeMillis() + 5000; // fire after 5 sec + * timerRegistry.register("example-timer", time); + * } + * public Collection<String> apply(String s) { + * ... + * } + * public Collection<String> onTimer(String key, long timestamp) { + * // example-timer fired + * ... + * } + * } + * }</pre> + * @param <K> type of the key + * @param <OM> type of the output + */ +public interface TimerFunction<K, OM> { + + /** + * Registers any epoch-time timers using the registry + * @param timerRegistry a keyed {@link TimerRegistry} + */ + void registerTimer(TimerRegistry<K> timerRegistry); + + /** + * Returns the output after the timer with key fires. + * @param key timer key + * @param timestamp time of the epoch-time timer fired, in milliseconds + * @return {@link Collection} of output elements + */ + Collection<OM> onTimer(K key, long timestamp); +} http://git-wip-us.apache.org/repos/asf/samza/blob/aa058655/samza-api/src/main/java/org/apache/samza/task/TaskContext.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/task/TaskContext.java b/samza-api/src/main/java/org/apache/samza/task/TaskContext.java index 11ffacc..ea2a3bc 100644 --- a/samza-api/src/main/java/org/apache/samza/task/TaskContext.java +++ b/samza-api/src/main/java/org/apache/samza/task/TaskContext.java @@ -74,4 +74,23 @@ public interface TaskContext { default Object getUserContext() { return null; } + + /** + * Register a keyed timer with a callback of {@link TimerCallback} in this task. + * The callback will be invoked exclusively with any other operations for this task, + * e.g. processing, windowing and commit. + * @param key timer key + * @param timestamp epoch time when the timer will be fired, in milliseconds + * @param callback callback when the timer is fired + * @param <K> type of the key + */ + <K> void registerTimer(K key, long timestamp, TimerCallback<K> callback); + + /** + * Delete the keyed timer in this task. + * Deletion only happens if the timer hasn't been fired. Otherwise it will not interrupt. + * @param key timer key + * @param <K> type of the key + */ + <K> void deleteTimer(K key); } http://git-wip-us.apache.org/repos/asf/samza/blob/aa058655/samza-api/src/main/java/org/apache/samza/task/TimerCallback.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/task/TimerCallback.java b/samza-api/src/main/java/org/apache/samza/task/TimerCallback.java new file mode 100644 index 0000000..3add129 --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/task/TimerCallback.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.task; + +/** + * The callback that is invoked when its corresponding timer registered via {@link TaskContext} fires. + * @param <K> type of the timer key + */ +public interface TimerCallback<K> { + /** + * Invoked when the timer of key fires. + * @param key timer key + * @param collector contains the means of sending message envelopes to the output stream. + * @param coordinator manages execution of tasks. + */ + void onTimer(K key, MessageCollector collector, TaskCoordinator coordinator); +} http://git-wip-us.apache.org/repos/asf/samza/blob/aa058655/samza-core/src/main/java/org/apache/samza/container/TaskContextImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/container/TaskContextImpl.java b/samza-core/src/main/java/org/apache/samza/container/TaskContextImpl.java index 0248486..0d76a33 100644 --- a/samza-core/src/main/java/org/apache/samza/container/TaskContextImpl.java +++ b/samza-core/src/main/java/org/apache/samza/container/TaskContextImpl.java @@ -19,10 +19,7 @@ package org.apache.samza.container; -import java.util.HashMap; -import java.util.Map; -import java.util.Set; - +import com.google.common.collect.ImmutableSet; import org.apache.samza.checkpoint.OffsetManager; import org.apache.samza.job.model.JobModel; import org.apache.samza.metrics.ReadableMetricsRegistry; @@ -32,11 +29,16 @@ import org.apache.samza.system.StreamMetadataCache; import org.apache.samza.system.SystemStreamPartition; import org.apache.samza.table.Table; import org.apache.samza.table.TableManager; +import org.apache.samza.task.SystemTimerScheduler; import org.apache.samza.task.TaskContext; +import org.apache.samza.task.TimerCallback; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.collect.ImmutableSet; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ScheduledExecutorService; public class TaskContextImpl implements TaskContext { private static final Logger LOG = LoggerFactory.getLogger(TaskContextImpl.class); @@ -51,6 +53,7 @@ public class TaskContextImpl implements TaskContext { private final JobModel jobModel; private final StreamMetadataCache streamMetadataCache; private final Map<String, Object> objectRegistry = new HashMap<>(); + private final SystemTimerScheduler timerScheduler; private Object userContext = null; @@ -62,7 +65,8 @@ public class TaskContextImpl implements TaskContext { TaskStorageManager storageManager, TableManager tableManager, JobModel jobModel, - StreamMetadataCache streamMetadataCache) { + StreamMetadataCache streamMetadataCache, + ScheduledExecutorService timerExecutor) { this.taskName = taskName; this.metrics = metrics; this.containerContext = containerContext; @@ -72,6 +76,7 @@ public class TaskContextImpl implements TaskContext { this.tableManager = tableManager; this.jobModel = jobModel; this.streamMetadataCache = streamMetadataCache; + this.timerScheduler = SystemTimerScheduler.create(timerExecutor); } @Override @@ -129,6 +134,16 @@ public class TaskContextImpl implements TaskContext { return userContext; } + @Override + public <K> void registerTimer(K key, long timestamp, TimerCallback<K> callback) { + timerScheduler.setTimer(key, timestamp, callback); + } + + @Override + public <K> void deleteTimer(K key) { + timerScheduler.deleteTimer(key); + } + public void registerObject(String name, Object value) { objectRegistry.put(name, value); } @@ -144,4 +159,8 @@ public class TaskContextImpl implements TaskContext { public StreamMetadataCache getStreamMetadataCache() { return streamMetadataCache; } + + public SystemTimerScheduler getTimerScheduler() { + return timerScheduler; + } } http://git-wip-us.apache.org/repos/asf/samza/blob/aa058655/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java index 15b763d..7219180 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java @@ -25,6 +25,8 @@ import org.apache.samza.container.TaskContextImpl; import org.apache.samza.container.TaskName; import org.apache.samza.job.model.ContainerModel; import org.apache.samza.job.model.TaskModel; +import org.apache.samza.operators.TimerRegistry; +import org.apache.samza.operators.functions.TimerFunction; import org.apache.samza.operators.functions.WatermarkFunction; import org.apache.samza.system.EndOfStreamMessage; import org.apache.samza.metrics.Counter; @@ -80,6 +82,7 @@ public abstract class OperatorImpl<M, RM> { private EndOfStreamStates eosStates; // watermark states private WatermarkStates watermarkStates; + private TaskContext taskContext; /** * Initialize this {@link OperatorImpl} and its user-defined functions. @@ -121,7 +124,8 @@ public abstract class OperatorImpl<M, RM> { this.usedInCurrentTask = true; } - handleInit(config, context); + this.taskContext = taskContext; + handleInit(config, taskContext); initialized = true; } @@ -415,6 +419,42 @@ public abstract class OperatorImpl<M, RM> { } } + /** + * Returns a registry which allows registering arbitrary system-clock timer with K-typed key. + * The user-defined function in the operator spec needs to implement {@link TimerFunction#onTimer(Object, long)} + * for timer notifications. + * @param <K> key type for the timer. + * @return an instance of {@link TimerRegistry} + */ + <K> TimerRegistry<K> createOperatorTimerRegistry() { + return new TimerRegistry<K>() { + @Override + public void register(K key, long time) { + taskContext.registerTimer(key, time, (k, collector, coordinator) -> { + final TimerFunction<K, RM> timerFn = getOperatorSpec().getTimerFn(); + if (timerFn != null) { + final Collection<RM> output = timerFn.onTimer(key, time); + + if (!output.isEmpty()) { + output.forEach(rm -> + registeredOperators.forEach(op -> + op.onMessage(rm, collector, coordinator))); + } + } else { + throw new SamzaException( + String.format("Operator %s id %s (created at %s) must implement TimerFunction to use system timer.", + getOperatorSpec().getOpCode().name(), getOpImplId(), getOperatorSpec().getSourceLocation())); + } + }); + } + + @Override + public void delete(K key) { + taskContext.deleteTimer(key); + } + }; + } + public void close() { if (closed) { throw new IllegalStateException( http://git-wip-us.apache.org/repos/asf/samza/blob/aa058655/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java index 3882544..bbc8783 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java @@ -18,20 +18,15 @@ */ package org.apache.samza.operators.impl; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; - +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Lists; +import com.google.common.collect.Multimap; import org.apache.samza.config.Config; import org.apache.samza.container.TaskContextImpl; import org.apache.samza.job.model.JobModel; import org.apache.samza.operators.KV; import org.apache.samza.operators.StreamGraphImpl; +import org.apache.samza.operators.TimerRegistry; import org.apache.samza.operators.functions.JoinFunction; import org.apache.samza.operators.functions.PartialJoinFunction; import org.apache.samza.operators.impl.store.TimestampedValue; @@ -41,11 +36,11 @@ import org.apache.samza.operators.spec.JoinOperatorSpec; import org.apache.samza.operators.spec.OperatorSpec; import org.apache.samza.operators.spec.OutputOperatorSpec; import org.apache.samza.operators.spec.PartitionByOperatorSpec; +import org.apache.samza.operators.spec.SendToTableOperatorSpec; import org.apache.samza.operators.spec.SinkOperatorSpec; import org.apache.samza.operators.spec.StreamOperatorSpec; import org.apache.samza.operators.spec.StreamTableJoinOperatorSpec; import org.apache.samza.operators.spec.WindowOperatorSpec; -import org.apache.samza.operators.spec.SendToTableOperatorSpec; import org.apache.samza.storage.kv.KeyValueStore; import org.apache.samza.system.SystemStream; import org.apache.samza.task.TaskContext; @@ -53,9 +48,14 @@ import org.apache.samza.util.Clock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.collect.HashMultimap; -import com.google.common.collect.Lists; -import com.google.common.collect.Multimap; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; /** * The DAG of {@link OperatorImpl}s corresponding to the DAG of {@link OperatorSpec}s. @@ -166,6 +166,11 @@ public class OperatorImplGraph { operatorImpl.init(config, context); operatorImpl.registerInputStream(inputStream); + if (operatorSpec.getTimerFn() != null) { + final TimerRegistry timerRegistry = operatorImpl.createOperatorTimerRegistry(); + operatorSpec.getTimerFn().registerTimer(timerRegistry); + } + // Note: The key here is opImplId, which may not equal opId for some impls (e.g. PartialJoinOperatorImpl). // This is currently OK since we don't need to look up a partial join operator impl again during traversal // (a join cannot have a cycle). http://git-wip-us.apache.org/repos/asf/samza/blob/aa058655/samza-core/src/main/java/org/apache/samza/operators/spec/BroadcastOperatorSpec.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/BroadcastOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/BroadcastOperatorSpec.java index 6689690..2c76e60 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/spec/BroadcastOperatorSpec.java +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/BroadcastOperatorSpec.java @@ -20,6 +20,7 @@ package org.apache.samza.operators.spec; +import org.apache.samza.operators.functions.TimerFunction; import org.apache.samza.operators.functions.WatermarkFunction; public class BroadcastOperatorSpec<M> extends OperatorSpec<M, Void> { @@ -40,4 +41,9 @@ public class BroadcastOperatorSpec<M> extends OperatorSpec<M, Void> { public WatermarkFunction getWatermarkFn() { return null; } + + @Override + public TimerFunction getTimerFn() { + return null; + } } http://git-wip-us.apache.org/repos/asf/samza/blob/aa058655/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java index 2ad0597..2ed1e30 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java @@ -19,6 +19,7 @@ package org.apache.samza.operators.spec; import org.apache.samza.operators.KV; +import org.apache.samza.operators.functions.TimerFunction; import org.apache.samza.serializers.Serde; import org.apache.samza.operators.functions.WatermarkFunction; import org.apache.samza.system.StreamSpec; @@ -65,5 +66,10 @@ public class InputOperatorSpec<K, V> extends OperatorSpec<KV<K, V>, Object> { // @Override public WatermarkFunction getWatermarkFn() { return null; - } + } + + @Override + public TimerFunction getTimerFn() { + return null; + } } http://git-wip-us.apache.org/repos/asf/samza/blob/aa058655/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java index c730bca..9e058ff 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java @@ -20,6 +20,7 @@ package org.apache.samza.operators.spec; import com.google.common.collect.ImmutableMap; import org.apache.samza.operators.functions.JoinFunction; +import org.apache.samza.operators.functions.TimerFunction; import org.apache.samza.operators.functions.WatermarkFunction; import org.apache.samza.operators.impl.store.TimestampedValueSerde; import org.apache.samza.operators.impl.store.TimestampedValue; @@ -97,6 +98,11 @@ public class JoinOperatorSpec<K, M, OM, JM> extends OperatorSpec<Object, JM> imp return joinFn instanceof WatermarkFunction ? (WatermarkFunction) joinFn : null; } + @Override + public TimerFunction getTimerFn() { + return joinFn instanceof TimerFunction ? (TimerFunction) joinFn : null; + } + public OperatorSpec getLeftInputOpSpec() { return leftInputOpSpec; } http://git-wip-us.apache.org/repos/asf/samza/blob/aa058655/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java index 00b5318..7b0a41b 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java @@ -25,6 +25,7 @@ import java.util.Set; import org.apache.samza.annotation.InterfaceStability; import org.apache.samza.operators.MessageStream; import org.apache.samza.operators.MessageStreamImpl; +import org.apache.samza.operators.functions.TimerFunction; import org.apache.samza.operators.functions.WatermarkFunction; /** @@ -130,4 +131,6 @@ public abstract class OperatorSpec<M, OM> { } abstract public WatermarkFunction getWatermarkFn(); + + abstract public TimerFunction getTimerFn(); } http://git-wip-us.apache.org/repos/asf/samza/blob/aa058655/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java index 2a2e33a..c38f6e8 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java @@ -95,7 +95,7 @@ public class OperatorSpecs { public void close() { mapFn.close(); } - }, OperatorSpec.OpCode.MAP, opId); + }, mapFn, OperatorSpec.OpCode.MAP, opId); } /** @@ -129,7 +129,7 @@ public class OperatorSpecs { public void close() { filterFn.close(); } - }, OperatorSpec.OpCode.FILTER, opId); + }, filterFn, OperatorSpec.OpCode.FILTER, opId); } /** @@ -143,7 +143,7 @@ public class OperatorSpecs { */ public static <M, OM> StreamOperatorSpec<M, OM> createFlatMapOperatorSpec( FlatMapFunction<? super M, ? extends OM> flatMapFn, String opId) { - return new StreamOperatorSpec<>((FlatMapFunction<M, OM>) flatMapFn, OperatorSpec.OpCode.FLAT_MAP, opId); + return new StreamOperatorSpec<>((FlatMapFunction<M, OM>) flatMapFn, flatMapFn, OperatorSpec.OpCode.FLAT_MAP, opId); } /** @@ -242,7 +242,7 @@ public class OperatorSpecs { this.add(message); } }, - OperatorSpec.OpCode.MERGE, opId); + null, OperatorSpec.OpCode.MERGE, opId); } /** http://git-wip-us.apache.org/repos/asf/samza/blob/aa058655/samza-core/src/main/java/org/apache/samza/operators/spec/OutputOperatorSpec.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/OutputOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/OutputOperatorSpec.java index 6cb4fca..40a5c0e 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/spec/OutputOperatorSpec.java +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/OutputOperatorSpec.java @@ -19,6 +19,7 @@ package org.apache.samza.operators.spec; +import org.apache.samza.operators.functions.TimerFunction; import org.apache.samza.operators.functions.WatermarkFunction; /** @@ -56,4 +57,9 @@ public class OutputOperatorSpec<M> extends OperatorSpec<M, Void> { public WatermarkFunction getWatermarkFn() { return null; } + + @Override + public TimerFunction getTimerFn() { + return null; + } } http://git-wip-us.apache.org/repos/asf/samza/blob/aa058655/samza-core/src/main/java/org/apache/samza/operators/spec/PartitionByOperatorSpec.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/PartitionByOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/PartitionByOperatorSpec.java index 399c836..a0a9b61 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/spec/PartitionByOperatorSpec.java +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/PartitionByOperatorSpec.java @@ -19,6 +19,7 @@ package org.apache.samza.operators.spec; import org.apache.samza.operators.KV; +import org.apache.samza.operators.functions.TimerFunction; import org.apache.samza.operators.functions.WatermarkFunction; import java.util.function.Function; @@ -78,4 +79,9 @@ public class PartitionByOperatorSpec<M, K, V> extends OperatorSpec<M, Void> { public WatermarkFunction getWatermarkFn() { return null; } + + @Override + public TimerFunction getTimerFn() { + return null; + } } http://git-wip-us.apache.org/repos/asf/samza/blob/aa058655/samza-core/src/main/java/org/apache/samza/operators/spec/SendToTableOperatorSpec.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/SendToTableOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/SendToTableOperatorSpec.java index 9084be2..e1b51be 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/spec/SendToTableOperatorSpec.java +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/SendToTableOperatorSpec.java @@ -20,6 +20,7 @@ package org.apache.samza.operators.spec; import org.apache.samza.annotation.InterfaceStability; import org.apache.samza.operators.KV; +import org.apache.samza.operators.functions.TimerFunction; import org.apache.samza.operators.functions.WatermarkFunction; import org.apache.samza.table.TableSpec; @@ -62,4 +63,9 @@ public class SendToTableOperatorSpec<K, V> extends OperatorSpec<KV<K, V>, Void> public WatermarkFunction getWatermarkFn() { return null; } + + @Override + public TimerFunction getTimerFn() { + return null; + } } http://git-wip-us.apache.org/repos/asf/samza/blob/aa058655/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java index 1ca3801..aa0f066 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java @@ -19,6 +19,7 @@ package org.apache.samza.operators.spec; import org.apache.samza.operators.functions.SinkFunction; +import org.apache.samza.operators.functions.TimerFunction; import org.apache.samza.operators.functions.WatermarkFunction; @@ -54,4 +55,9 @@ public class SinkOperatorSpec<M> extends OperatorSpec<M, Void> { public WatermarkFunction getWatermarkFn() { return sinkFn instanceof WatermarkFunction ? (WatermarkFunction) sinkFn : null; } + + @Override + public TimerFunction getTimerFn() { + return sinkFn instanceof TimerFunction ? (TimerFunction) sinkFn : null; + } } http://git-wip-us.apache.org/repos/asf/samza/blob/aa058655/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java index b1e29c6..644eb6c 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java @@ -19,6 +19,7 @@ package org.apache.samza.operators.spec; import org.apache.samza.operators.functions.FlatMapFunction; +import org.apache.samza.operators.functions.TimerFunction; import org.apache.samza.operators.functions.WatermarkFunction; @@ -31,17 +32,20 @@ import org.apache.samza.operators.functions.WatermarkFunction; public class StreamOperatorSpec<M, OM> extends OperatorSpec<M, OM> { private final FlatMapFunction<M, OM> transformFn; + private final Object originalFn; /** * Constructor for a {@link StreamOperatorSpec}. * * @param transformFn the transformation function + * @param originalFn the original user function before wrapping to transformFn * @param opCode the {@link OpCode} for this {@link StreamOperatorSpec} * @param opId the unique ID for this {@link StreamOperatorSpec} */ - StreamOperatorSpec(FlatMapFunction<M, OM> transformFn, OperatorSpec.OpCode opCode, String opId) { + StreamOperatorSpec(FlatMapFunction<M, OM> transformFn, Object originalFn, OperatorSpec.OpCode opCode, String opId) { super(opCode, opId); this.transformFn = transformFn; + this.originalFn = originalFn; } public FlatMapFunction<M, OM> getTransformFn() { @@ -50,6 +54,11 @@ public class StreamOperatorSpec<M, OM> extends OperatorSpec<M, OM> { @Override public WatermarkFunction getWatermarkFn() { - return transformFn instanceof WatermarkFunction ? (WatermarkFunction) transformFn : null; + return originalFn instanceof WatermarkFunction ? (WatermarkFunction) originalFn : null; + } + + @Override + public TimerFunction getTimerFn() { + return originalFn instanceof TimerFunction ? (TimerFunction) originalFn : null; } } http://git-wip-us.apache.org/repos/asf/samza/blob/aa058655/samza-core/src/main/java/org/apache/samza/operators/spec/StreamTableJoinOperatorSpec.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/StreamTableJoinOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/StreamTableJoinOperatorSpec.java index 730913a..c7735c6 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/spec/StreamTableJoinOperatorSpec.java +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/StreamTableJoinOperatorSpec.java @@ -20,6 +20,7 @@ package org.apache.samza.operators.spec; import org.apache.samza.annotation.InterfaceStability; import org.apache.samza.operators.functions.StreamTableJoinFunction; +import org.apache.samza.operators.functions.TimerFunction; import org.apache.samza.operators.functions.WatermarkFunction; import org.apache.samza.table.TableSpec; @@ -64,4 +65,9 @@ public class StreamTableJoinOperatorSpec<K, M, R, JM> extends OperatorSpec<M, JM return joinFn instanceof WatermarkFunction ? (WatermarkFunction) joinFn : null; } + @Override + public TimerFunction getTimerFn() { + return joinFn instanceof TimerFunction ? (TimerFunction) joinFn : null; + } + } http://git-wip-us.apache.org/repos/asf/samza/blob/aa058655/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java index 06a4f4b..1c8e592 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java @@ -20,6 +20,7 @@ package org.apache.samza.operators.spec; import org.apache.samza.operators.functions.FoldLeftFunction; +import org.apache.samza.operators.functions.TimerFunction; import org.apache.samza.operators.functions.WatermarkFunction; import org.apache.samza.operators.impl.store.TimeSeriesKeySerde; import org.apache.samza.operators.triggers.AnyTrigger; @@ -123,6 +124,12 @@ public class WindowOperatorSpec<M, WK, WV> extends OperatorSpec<M, WindowPane<WK } @Override + public TimerFunction getTimerFn() { + FoldLeftFunction fn = window.getFoldLeftFunction(); + return fn instanceof TimerFunction ? (TimerFunction) fn : null; + } + + @Override public Collection<StoreDescriptor> getStoreDescriptors() { String storeName = getOpId(); String storeFactory = "org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory"; http://git-wip-us.apache.org/repos/asf/samza/blob/aa058655/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java b/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java index b8f48c7..f4b1d41 100644 --- a/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java +++ b/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java @@ -304,6 +304,7 @@ public class AsyncRunLoop implements Runnable, Throttleable { COMMIT, PROCESS, END_OF_STREAM, + TIMER, NO_OP } @@ -346,6 +347,13 @@ public class AsyncRunLoop implements Runnable, Throttleable { } }, commitMs, commitMs, TimeUnit.MILLISECONDS); } + + final SystemTimerScheduler timerFactory = task.context().getTimerScheduler(); + if (timerFactory != null) { + timerFactory.registerListener(() -> { + state.needTimer(); + }); + } } /** @@ -375,6 +383,9 @@ public class AsyncRunLoop implements Runnable, Throttleable { case WINDOW: window(); break; + case TIMER: + timer(); + break; case COMMIT: commit(); break; @@ -514,6 +525,39 @@ public class AsyncRunLoop implements Runnable, Throttleable { } } + private void timer() { + state.startTimer(); + Runnable timerWorker = new Runnable() { + @Override + public void run() { + try { + ReadableCoordinator coordinator = new ReadableCoordinator(task.taskName()); + + long startTime = clock.nanoTime(); + task.timer(coordinator); + containerMetrics.timerNs().update(clock.nanoTime() - startTime); + + coordinatorRequests.update(coordinator); + state.doneTimer(); + } catch (Throwable t) { + log.error("Task {} timer failed", task.taskName(), t); + abort(t); + } finally { + log.trace("Task {} timer completed", task.taskName()); + resume(); + } + } + }; + + if (threadPool != null) { + log.trace("Task {} timer runs on the thread pool", task.taskName()); + threadPool.submit(timerWorker); + } else { + log.trace("Task {} timer runs on the run loop thread", task.taskName()); + timerWorker.run(); + } + } + /** * Task process completes successfully, update the offsets based on the high-water mark. * Then it will trigger the listener for task state change. @@ -585,10 +629,12 @@ public class AsyncRunLoop implements Runnable, Throttleable { private final class AsyncTaskState { private volatile boolean needWindow = false; private volatile boolean needCommit = false; + private volatile boolean needTimer = false; private volatile boolean complete = false; private volatile boolean endOfStream = false; private volatile boolean windowInFlight = false; private volatile boolean commitInFlight = false; + private volatile boolean timerInFlight = false; private final AtomicInteger messagesInFlight = new AtomicInteger(0); private final ArrayDeque<PendingEnvelope> pendingEnvelopeQueue; @@ -634,29 +680,28 @@ public class AsyncRunLoop implements Runnable, Throttleable { needCommit = true; } - boolean windowOrCommitInFlight = windowInFlight || commitInFlight; + boolean opInFlight = windowInFlight || commitInFlight || timerInFlight; /* * A task is ready to commit, when task.commit(needCommit) is requested either by user or commit thread * and either of the following conditions are true. - * a) When process, window, commit are not in progress. + * a) When process, window, commit and timer are not in progress. * b) When task.async.commit is true and window, commit are not in progress. */ if (needCommit) { - return (messagesInFlight.get() == 0 || isAsyncCommitEnabled) && !windowOrCommitInFlight; - } else if (needWindow || endOfStream) { + return (messagesInFlight.get() == 0 || isAsyncCommitEnabled) && !opInFlight; + } else if (needWindow || needTimer || endOfStream) { /* - * A task is ready for window operation, when task.window(needWindow) is requested by either user or window thread - * and window, commit are not in progress. + * A task is ready for window, timer or end-of-stream operation. */ - return messagesInFlight.get() == 0 && !windowOrCommitInFlight; + return messagesInFlight.get() == 0 && !opInFlight; } else { /* * A task is ready to process new message, when number of task.process calls in progress < task.max.concurrency * and either of the following conditions are true. - * a) When window, commit are not in progress. - * b) When task.async.commit is true and window is not in progress. + * a) When window, commit and timer are not in progress. + * b) When task.async.commit is true and window and timer are not in progress. */ - return messagesInFlight.get() < maxConcurrency && !windowInFlight && (isAsyncCommitEnabled || !commitInFlight); + return messagesInFlight.get() < maxConcurrency && !windowInFlight && !timerInFlight && (isAsyncCommitEnabled || !commitInFlight); } } @@ -670,6 +715,7 @@ public class AsyncRunLoop implements Runnable, Throttleable { if (isReady()) { if (needCommit) return WorkerOp.COMMIT; else if (needWindow) return WorkerOp.WINDOW; + else if (needTimer) return WorkerOp.TIMER; else if (endOfStream && pendingEnvelopeQueue.isEmpty()) return WorkerOp.END_OF_STREAM; else if (!pendingEnvelopeQueue.isEmpty()) return WorkerOp.PROCESS; } @@ -684,6 +730,10 @@ public class AsyncRunLoop implements Runnable, Throttleable { needCommit = true; } + private void needTimer() { + needTimer = true; + } + private void startWindow() { needWindow = false; windowInFlight = true; @@ -699,6 +749,11 @@ public class AsyncRunLoop implements Runnable, Throttleable { taskMetrics.messagesInFlight().set(count); } + private void startTimer() { + needTimer = false; + timerInFlight = true; + } + private void doneCommit() { commitInFlight = false; } @@ -712,6 +767,10 @@ public class AsyncRunLoop implements Runnable, Throttleable { taskMetrics.messagesInFlight().set(count); } + private void doneTimer() { + timerInFlight = false; + } + /** * Insert an PendingEnvelope into the pending envelope queue. * The function will be called in the run loop thread so no synchronization. http://git-wip-us.apache.org/repos/asf/samza/blob/aa058655/samza-core/src/main/java/org/apache/samza/task/SystemTimerScheduler.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/task/SystemTimerScheduler.java b/samza-core/src/main/java/org/apache/samza/task/SystemTimerScheduler.java new file mode 100644 index 0000000..4589058 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/task/SystemTimerScheduler.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.task; + +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +import static com.google.common.base.Preconditions.checkState; + +/** + * Per-task scheduler for keyed timers. + * It does the following things: + * 1) schedules the timer on the {@link ScheduledExecutorService}. + * 2) keeps track of the timers created and timers that are ready. + * 3) triggers listener whenever a timer fires. + */ +public class SystemTimerScheduler { + + /** + * For run loop to listen to timer firing so it can schedule the callbacks. + */ + public interface TimerListener { + void onTimer(); + } + + private final ScheduledExecutorService executor; + private final Map<Object, ScheduledFuture> scheduledFutures = new ConcurrentHashMap<>(); + private final Map<TimerKey<?>, TimerCallback> readyTimers = new ConcurrentHashMap<>(); + private TimerListener timerListener; + + public static SystemTimerScheduler create(ScheduledExecutorService executor) { + return new SystemTimerScheduler(executor); + } + + private SystemTimerScheduler(ScheduledExecutorService executor) { + this.executor = executor; + } + + public <K> void setTimer(K key, long timestamp, TimerCallback<K> callback) { + checkState(!scheduledFutures.containsKey(key), + String.format("Duplicate key %s registration for the same timer", key)); + + final long delay = timestamp - System.currentTimeMillis(); + final ScheduledFuture<?> scheduledFuture = executor.schedule(() -> { + readyTimers.put(TimerKey.of(key, timestamp), callback); + + if (timerListener != null) { + timerListener.onTimer(); + } + }, delay > 0 ? delay : 0, TimeUnit.MILLISECONDS); + scheduledFutures.put(key, scheduledFuture); + } + + public <K> void deleteTimer(K key) { + final ScheduledFuture<?> scheduledFuture = scheduledFutures.remove(key); + if (scheduledFuture != null) { + scheduledFuture.cancel(false); + } + } + + void registerListener(TimerListener listener) { + timerListener = listener; + } + + public Map<TimerKey<?>, TimerCallback> removeReadyTimers() { + final Map<TimerKey<?>, TimerCallback> timers = new TreeMap<>(readyTimers); + readyTimers.keySet().removeAll(timers.keySet()); + return timers; + } + + public static class TimerKey<K> implements Comparable<TimerKey<K>> { + private final K key; + private final long time; + + static <K> TimerKey<K> of(K key, long time) { + return new TimerKey<>(key, time); + } + + private TimerKey(K key, long time) { + this.key = key; + this.time = time; + } + + public K getKey() { + return key; + } + + public long getTime() { + return time; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + TimerKey<?> timerKey = (TimerKey<?>) o; + if (time != ((TimerKey<?>) o).time) { + return false; + } + return key.equals(timerKey.key); + } + + @Override + public int hashCode() { + int result = key.hashCode(); + result = 31 * result + Long.valueOf(time).hashCode(); + return result; + } + + @Override + public String toString() { + return "TimerKey{" + + "key=" + key + + ", time='" + time + '\'' + + '}'; + } + + @Override + public int compareTo(TimerKey<K> o) { + final int timeCompare = Long.compare(time, o.time); + if (timeCompare != 0) { + return timeCompare; + } + + return key.hashCode() - o.key.hashCode(); + } + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/aa058655/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala index bef5b41..789d75b 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala @@ -24,7 +24,7 @@ import java.net.{URL, UnknownHostException} import java.nio.file.Path import java.util import java.util.Base64 -import java.util.concurrent.{ExecutorService, Executors, TimeUnit} +import java.util.concurrent.{ScheduledExecutorService, ExecutorService, Executors, TimeUnit} import com.google.common.util.concurrent.ThreadFactoryBuilder import org.apache.samza.checkpoint.{CheckpointListener, CheckpointManagerFactory, OffsetManager, OffsetManagerMetrics} @@ -436,6 +436,8 @@ object SamzaContainer extends Logging { val storeWatchPaths = new util.HashSet[Path]() + val timerExecutor = Executors.newSingleThreadScheduledExecutor + val taskInstances: Map[TaskName, TaskInstance] = containerModel.getTasks.values.asScala.map(taskModel => { debug("Setting up task instance: %s" format taskModel) @@ -563,7 +565,8 @@ object SamzaContainer extends Logging { systemStreamPartitions = systemStreamPartitions, exceptionHandler = TaskInstanceExceptionHandler(taskInstanceMetrics, config), jobModel = jobModel, - streamMetadataCache = streamMetadataCache) + streamMetadataCache = streamMetadataCache, + timerExecutor = timerExecutor) val taskInstance = createTaskInstance(task) @@ -636,7 +639,8 @@ object SamzaContainer extends Logging { jvm = jvm, diskSpaceMonitor = diskSpaceMonitor, hostStatisticsMonitor = memoryStatisticsMonitor, - taskThreadPool = taskThreadPool) + taskThreadPool = taskThreadPool, + timerExecutor = timerExecutor) } } @@ -656,7 +660,8 @@ class SamzaContainer( securityManager: SecurityManager = null, reporters: Map[String, MetricsReporter] = Map(), jvm: JvmMetrics = null, - taskThreadPool: ExecutorService = null) extends Runnable with Logging { + taskThreadPool: ExecutorService = null, + timerExecutor: ScheduledExecutorService = Executors.newSingleThreadScheduledExecutor) extends Runnable with Logging { val shutdownMs = containerContext.config.getShutdownMs.getOrElse(TaskConfigJava.DEFAULT_TASK_SHUTDOWN_MS) var shutdownHookThread: Thread = null @@ -1003,6 +1008,18 @@ class SamzaContainer( } } + if (timerExecutor != null) { + info("Shutting down timer executor") + try { + timerExecutor.shutdown() + if (timerExecutor.awaitTermination(shutdownMs, TimeUnit.MILLISECONDS)) { + timerExecutor.shutdownNow() + } + } catch { + case e: Exception => error("Ignoring exception shutting down timer executor", e) + } + } + if (isAutoCommitEnabled) { info("Committing offsets for all task instances") taskInstances.values.foreach(_.commit) http://git-wip-us.apache.org/repos/asf/samza/blob/aa058655/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala index d080939..c122956 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala @@ -29,11 +29,13 @@ class SamzaContainerMetrics( val commits = newCounter("commit-calls") val windows = newCounter("window-calls") + val timers = newCounter("timer-calls") val processes = newCounter("process-calls") val envelopes = newCounter("process-envelopes") val nullEnvelopes = newCounter("process-null-envelopes") val chooseNs = newTimer("choose-ns") val windowNs = newTimer("window-ns") + val timerNs = newTimer("timer-ns") val processNs = newTimer("process-ns") val commitNs = newTimer("commit-ns") val blockNs = newTimer("block-ns") http://git-wip-us.apache.org/repos/asf/samza/blob/aa058655/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala index c7d76c2..cb73c5d 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala @@ -20,12 +20,15 @@ package org.apache.samza.container +import java.util.concurrent.ScheduledExecutorService + import org.apache.samza.SamzaException import org.apache.samza.checkpoint.OffsetManager import org.apache.samza.config.Config import org.apache.samza.config.StreamConfig.Config2Stream import org.apache.samza.job.model.JobModel import org.apache.samza.metrics.MetricsReporter +import org.apache.samza.operators.functions.TimerFunction import org.apache.samza.storage.TaskStorageManager import org.apache.samza.system._ import org.apache.samza.table.TableManager @@ -33,6 +36,7 @@ import org.apache.samza.task._ import org.apache.samza.util.Logging import scala.collection.JavaConverters._ +import scala.collection.JavaConversions._ class TaskInstance( val task: Any, @@ -50,7 +54,8 @@ class TaskInstance( val systemStreamPartitions: Set[SystemStreamPartition] = Set(), val exceptionHandler: TaskInstanceExceptionHandler = new TaskInstanceExceptionHandler, jobModel: JobModel = null, - streamMetadataCache: StreamMetadataCache = null) extends Logging { + streamMetadataCache: StreamMetadataCache = null, + timerExecutor : ScheduledExecutorService = null) extends Logging { val isInitableTask = task.isInstanceOf[InitableTask] val isWindowableTask = task.isInstanceOf[WindowableTask] val isEndOfStreamListenerTask = task.isInstanceOf[EndOfStreamListenerTask] @@ -58,7 +63,7 @@ class TaskInstance( val isAsyncTask = task.isInstanceOf[AsyncStreamTask] val context = new TaskContextImpl(taskName, metrics, containerContext, systemStreamPartitions.asJava, offsetManager, - storageManager, tableManager, jobModel, streamMetadataCache) + storageManager, tableManager, jobModel, streamMetadataCache, timerExecutor) // store the (ssp -> if this ssp is catched up) mapping. "catched up" // means the same ssp in other taskInstances have the same offset as @@ -185,6 +190,16 @@ class TaskInstance( } } + def timer(coordinator: ReadableCoordinator) { + trace("Timer for taskName: %s" format taskName) + + exceptionHandler.maybeHandle { + context.getTimerScheduler.removeReadyTimers().entrySet().foreach { entry => + entry.getValue.asInstanceOf[TimerCallback[Any]].onTimer(entry.getKey.getKey, collector, coordinator) + } + } + } + def commit { metrics.commits.inc http://git-wip-us.apache.org/repos/asf/samza/blob/aa058655/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java index a4f0354..249ff09 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java +++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java @@ -28,6 +28,7 @@ import org.apache.samza.metrics.Counter; import org.apache.samza.metrics.MetricsRegistryMap; import org.apache.samza.metrics.ReadableMetricsRegistry; import org.apache.samza.metrics.Timer; +import org.apache.samza.operators.functions.TimerFunction; import org.apache.samza.operators.functions.WatermarkFunction; import org.apache.samza.operators.spec.OperatorSpec; import org.apache.samza.task.MessageCollector; @@ -217,6 +218,11 @@ public class TestOperatorImpl { public WatermarkFunction getWatermarkFn() { return null; } + + @Override + public TimerFunction getTimerFn() { + return null; + } } public static Set<OperatorImpl> getNextOperators(OperatorImpl op) { http://git-wip-us.apache.org/repos/asf/samza/blob/aa058655/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java index 1448f79..2d8d1eb 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java +++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java @@ -220,7 +220,7 @@ public class TestOperatorImplGraph { new OperatorImplGraph(streamGraph, mock(Config.class), mockTaskContext, mock(Clock.class)); // verify that the DAG after merge is only traversed & initialized once - verify(mockMapFunction, times(1)).init(any(Config.class), any(TaskContextImpl.class)); + verify(mockMapFunction, times(1)).init(any(Config.class), any(TaskContext.class)); } @Test @@ -249,7 +249,7 @@ public class TestOperatorImplGraph { new OperatorImplGraph(streamGraph, mockConfig, mockTaskContext, mock(Clock.class)); // verify that join function is initialized once. - verify(mockJoinFunction, times(1)).init(any(Config.class), any(TaskContextImpl.class)); + verify(mockJoinFunction, times(1)).init(any(Config.class), any(TaskContext.class)); InputOperatorImpl inputOpImpl1 = opImplGraph.getInputOperator(new SystemStream("input-system", "input-stream1")); InputOperatorImpl inputOpImpl2 = opImplGraph.getInputOperator(new SystemStream("input-system", "input-stream2")); http://git-wip-us.apache.org/repos/asf/samza/blob/aa058655/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java b/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java index 28a4f8b..7f54614 100644 --- a/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java +++ b/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java @@ -86,7 +86,8 @@ public class TestAsyncRunLoop { scala.collection.immutable.Set<SystemStreamPartition> sspSet = JavaConverters.asScalaSetConverter(Collections.singleton(ssp)).asScala().toSet(); return new TaskInstance(task, taskName, mock(Config.class), taskInstanceMetrics, null, consumers, mock(TaskInstanceCollector.class), mock(SamzaContainerContext.class), - manager, null, null, null, sspSet, new TaskInstanceExceptionHandler(taskInstanceMetrics, new scala.collection.immutable.HashSet<String>()), null, null); + manager, null, null, null, sspSet, new TaskInstanceExceptionHandler(taskInstanceMetrics, + new scala.collection.immutable.HashSet<String>()), null, null, null); } TaskInstance createTaskInstance(AsyncStreamTask task, TaskName taskName, SystemStreamPartition ssp) { http://git-wip-us.apache.org/repos/asf/samza/blob/aa058655/samza-core/src/test/java/org/apache/samza/task/TestSystemTimerScheduler.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/task/TestSystemTimerScheduler.java b/samza-core/src/test/java/org/apache/samza/task/TestSystemTimerScheduler.java new file mode 100644 index 0000000..dd08121 --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/task/TestSystemTimerScheduler.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.task; + +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.anyObject; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class TestSystemTimerScheduler { + + private ScheduledExecutorService createExecutorService() { + ScheduledExecutorService service = mock(ScheduledExecutorService.class); + when(service.schedule((Runnable) anyObject(), anyLong(), anyObject())).thenAnswer(invocation -> { + Object[] args = invocation.getArguments(); + Runnable runnable = (Runnable) args[0]; + runnable.run(); + return mock(ScheduledFuture.class); + }); + return service; + } + + private void fireTimers(SystemTimerScheduler factory) { + factory.removeReadyTimers().entrySet().forEach(entry -> { + entry.getValue().onTimer(entry.getKey().getKey(), mock(MessageCollector.class), mock(TaskCoordinator.class)); + }); + } + + @Test + public void testSingleTimer() { + SystemTimerScheduler scheduler = SystemTimerScheduler.create(createExecutorService()); + List<String> results = new ArrayList<>(); + scheduler.setTimer("single-timer", 1, (key, collector, coordinator) -> { + results.add(key); + }); + + fireTimers(scheduler); + + assertTrue(results.size() == 1); + assertEquals(results.get(0), "single-timer"); + } + + @Test + public void testMultipleTimers() { + SystemTimerScheduler scheduler = SystemTimerScheduler.create(createExecutorService()); + List<String> results = new ArrayList<>(); + scheduler.setTimer("multiple-timer-3", 3, (key, collector, coordinator) -> { + results.add(key + ":3"); + }); + scheduler.setTimer("multiple-timer-2", 2, (key, collector, coordinator) -> { + results.add(key + ":2"); + }); + scheduler.setTimer("multiple-timer-1", 1, (key, collector, coordinator) -> { + results.add(key + ":1"); + }); + + fireTimers(scheduler); + + assertTrue(results.size() == 3); + assertEquals(results.get(0), "multiple-timer-1:1"); + assertEquals(results.get(1), "multiple-timer-2:2"); + assertEquals(results.get(2), "multiple-timer-3:3"); + } + + @Test + public void testMultipleKeys() { + Object key1 = new Object(); + Object key2 = new Object(); + List<String> results = new ArrayList<>(); + + SystemTimerScheduler scheduler = SystemTimerScheduler.create(createExecutorService()); + scheduler.setTimer(key1, 2, (key, collector, coordinator) -> { + assertEquals(key, key1); + results.add("key1:2"); + }); + scheduler.setTimer(key2, 1, (key, collector, coordinator) -> { + assertEquals(key, key2); + results.add("key2:1"); + }); + + fireTimers(scheduler); + + assertTrue(results.size() == 2); + assertEquals(results.get(0), "key2:1"); + assertEquals(results.get(1), "key1:2"); + } + + @Test + public void testMultipleKeyTypes() { + String key1 = "key"; + Long key2 = Long.MAX_VALUE; + List<String> results = new ArrayList<>(); + + SystemTimerScheduler scheduler = SystemTimerScheduler.create(createExecutorService()); + scheduler.setTimer(key1, 1, (key, collector, coordinator) -> { + assertEquals(key, key1); + results.add("key:1"); + }); + scheduler.setTimer(key2, 2, (key, collector, coordinator) -> { + assertEquals(key.longValue(), Long.MAX_VALUE); + results.add(Long.MAX_VALUE + ":2"); + }); + + fireTimers(scheduler); + + assertTrue(results.size() == 2); + assertEquals(results.get(0), key1 + ":1"); + assertEquals(results.get(1), key2 + ":2"); + } + + @Test + public void testRemoveTimer() { + ScheduledExecutorService service = mock(ScheduledExecutorService.class); + ScheduledFuture future = mock(ScheduledFuture.class); + when(future.cancel(anyBoolean())).thenReturn(true); + when(service.schedule((Runnable) anyObject(), anyLong(), anyObject())).thenAnswer(invocation -> future); + + SystemTimerScheduler scheduler = SystemTimerScheduler.create(service); + List<String> results = new ArrayList<>(); + scheduler.setTimer("timer", 1, (key, collector, coordinator) -> { + results.add(key); + }); + + scheduler.deleteTimer("timer"); + + fireTimers(scheduler); + + assertTrue(results.isEmpty()); + verify(future, times(1)).cancel(anyBoolean()); + } + + @Test + public void testTimerListener() { + SystemTimerScheduler scheduler = SystemTimerScheduler.create(createExecutorService()); + List<String> results = new ArrayList<>(); + scheduler.registerListener(() -> { + results.add("timer-listener"); + }); + + scheduler.setTimer("timer-listener", 1, (key, collector, coordinator) -> { + }); + + fireTimers(scheduler); + + assertTrue(results.size() == 1); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/aa058655/samza-test/src/test/java/org/apache/samza/test/timer/TestTimerApp.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/timer/TestTimerApp.java b/samza-test/src/test/java/org/apache/samza/test/timer/TestTimerApp.java new file mode 100644 index 0000000..d8913c5 --- /dev/null +++ b/samza-test/src/test/java/org/apache/samza/test/timer/TestTimerApp.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.test.timer; + +import org.apache.samza.application.StreamApplication; +import org.apache.samza.config.Config; +import org.apache.samza.operators.MessageStream; +import org.apache.samza.operators.StreamGraph; +import org.apache.samza.operators.TimerRegistry; +import org.apache.samza.operators.functions.FlatMapFunction; +import org.apache.samza.operators.functions.TimerFunction; +import org.apache.samza.serializers.JsonSerdeV2; +import org.apache.samza.test.operator.data.PageView; +import org.apache.samza.test.util.StreamAssert; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +public class TestTimerApp implements StreamApplication { + public static final String PAGE_VIEWS = "page-views"; + + @Override + public void init(StreamGraph graph, Config config) { + final JsonSerdeV2<PageView> serde = new JsonSerdeV2<>(PageView.class); + final MessageStream<PageView> pageViews = graph.getInputStream(PAGE_VIEWS, serde); + final MessageStream<PageView> output = pageViews.flatMap(new FlatmapTimerFn()); + + StreamAssert.that("Output from timer function should container all complete messages", output, serde) + .containsInAnyOrder( + Arrays.asList( + new PageView("v1-complete", "p1", "u1"), + new PageView("v2-complete", "p2", "u1"), + new PageView("v3-complete", "p1", "u2"), + new PageView("v4-complete", "p3", "u2") + )); + } + + private static class FlatmapTimerFn implements FlatMapFunction<PageView, PageView>, TimerFunction<String, PageView> { + + private List<PageView> pageViews = new ArrayList<>(); + private TimerRegistry<String> timerRegistry; + + @Override + public void registerTimer(TimerRegistry<String> timerRegistry) { + this.timerRegistry = timerRegistry; + } + + @Override + public Collection<PageView> apply(PageView message) { + final PageView pv = new PageView(message.getViewId() + "-complete", message.getPageId(), message.getUserId()); + pageViews.add(pv); + + if (pageViews.size() == 2) { + //got all messages for this task + final long time = System.currentTimeMillis() + 100; + timerRegistry.register("CompleteTimer", time); + } + return Collections.emptyList(); + } + + @Override + public Collection<PageView> onTimer(String key, long time) { + return pageViews; + } + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/aa058655/samza-test/src/test/java/org/apache/samza/test/timer/TimerTest.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/timer/TimerTest.java b/samza-test/src/test/java/org/apache/samza/test/timer/TimerTest.java new file mode 100644 index 0000000..11b3aeb --- /dev/null +++ b/samza-test/src/test/java/org/apache/samza/test/timer/TimerTest.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.test.timer; + +import org.apache.samza.test.operator.StreamApplicationIntegrationTestHarness; +import org.junit.Before; +import org.junit.Test; + + +import static org.apache.samza.test.timer.TestTimerApp.PAGE_VIEWS; + +public class TimerTest extends StreamApplicationIntegrationTestHarness { + + @Before + public void setup() { + // create topics + createTopic(PAGE_VIEWS, 2); + + // create events for the following user activity. + // userId: (viewId, pageId, (adIds)) + // u1: (v1, p1, (a1)), (v2, p2, (a3)) + // u2: (v3, p1, (a1)), (v4, p3, (a5)) + produceMessage(PAGE_VIEWS, 0, "p1", "{\"viewId\":\"v1\",\"pageId\":\"p1\",\"userId\":\"u1\"}"); + produceMessage(PAGE_VIEWS, 1, "p2", "{\"viewId\":\"v2\",\"pageId\":\"p2\",\"userId\":\"u1\"}"); + produceMessage(PAGE_VIEWS, 0, "p1", "{\"viewId\":\"v3\",\"pageId\":\"p1\",\"userId\":\"u2\"}"); + produceMessage(PAGE_VIEWS, 1, "p3", "{\"viewId\":\"v4\",\"pageId\":\"p3\",\"userId\":\"u2\"}"); + + } + + @Test + public void testJob() { + runApplication(new TestTimerApp(), "TimerTest", null); + } +}