This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8d88b20  KAFKA-10199: Add interface for state updater (#11499)
8d88b20 is described below

commit 8d88b20b2779faa413ffc4c6d2546800e225213f
Author: Bruno Cadonna <cado...@apache.org>
AuthorDate: Wed Feb 23 19:13:08 2022 +0100

    KAFKA-10199: Add interface for state updater (#11499)
    
    Reviewers: Andrew Eugene Choi <andrew.c...@uwaterloo.ca>, Guozhang Wang 
<wangg...@gmail.com>
---
 .../streams/processor/internals/StateUpdater.java  | 69 ++++++++++++++++++++++
 1 file changed, 69 insertions(+)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateUpdater.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateUpdater.java
new file mode 100644
index 0000000..8965abf
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateUpdater.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.Set;
+
+public interface StateUpdater {
+
+    /**
+     * Adds a task (active or standby) to the state updater.
+     *
+     * @param task task to add
+     */
+    void add(final Task task);
+
+    /**
+     * Removes a task (active or standby) from the state updater.
+     *
+     * @param task task ro remove
+     */
+    void remove(final Task task);
+
+    /**
+     * Gets restored active tasks from state restoration/update
+     *
+     * @param timeout duration how long the calling thread should wait for 
restored active tasks
+     *
+     * @return list of active tasks with up-to-date states
+     */
+    Set<StreamTask> getRestoredActiveTasks(final Duration timeout);
+
+    /**
+     * Gets a list of exceptions thrown during restoration.
+     *
+     * @return exceptions
+     */
+    List<RuntimeException> getExceptions();
+
+
+    /**
+     * Get all tasks (active and standby) that are managed by the state 
updater.
+     *
+     * @return list of tasks managed by the state updater
+     */
+    Set<Task> getAllTasks();
+
+    /**
+     * Shuts down the state updater.
+     *
+     * @param timeout duration how long to wait until the state updater is 
shut down
+     */
+    void shutdown(final Duration timeout);
+}

Reply via email to