vvcephei commented on a change in pull request #9222:
URL: https://github.com/apache/kafka/pull/9222#discussion_r477477190



##########
File path: 
streams/examples/src/test/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorTest.java
##########
@@ -36,19 +36,19 @@
 public class WordCountProcessorTest {
     @Test
     public void test() {
-        final MockProcessorContext context = new MockProcessorContext();
+        final MockProcessorContext<String, String> context = new 
MockProcessorContext<>();
 
         // Create, initialize, and register the state store.
         final KeyValueStore<String, Integer> store =
             
Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("Counts"), 
Serdes.String(), Serdes.Integer())
                 .withLoggingDisabled() // Changelog is not supported by 
MockProcessorContext.
                 // Caching is disabled by default, but FYI: caching is also 
not supported by MockProcessorContext.
                 .build();
-        store.init(context, store);
+        store.init(context.getStateStoreContext(), store);

Review comment:
       Here's where we're switching contexts to the StateStoreContext to invoke 
the new API.

##########
File path: 
streams/test-utils/src/main/java/org/apache/kafka/streams/processor/api/MockProcessorContext.java
##########
@@ -0,0 +1,602 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.api;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsMetrics;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.internals.ApiUtils;
+import org.apache.kafka.streams.kstream.Transformer;
+import org.apache.kafka.streams.kstream.ValueTransformer;
+import org.apache.kafka.streams.processor.Cancellable;
+import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.processor.Punctuator;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.To;
+import org.apache.kafka.streams.processor.internals.ClientUtils;
+import org.apache.kafka.streams.processor.internals.RecordCollector;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
+import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
+
+import java.io.File;
+import java.lang.reflect.Field;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkProperties;
+
+/**
+ * {@link MockProcessorContext} is a mock of {@link ProcessorContext} for 
users to test their {@link Processor},
+ * {@link Transformer}, and {@link ValueTransformer} implementations.
+ * <p>
+ * The tests for this class 
(org.apache.kafka.streams.MockProcessorContextTest) include several behavioral
+ * tests that serve as example usage.
+ * <p>
+ * Note that this class does not take any automated actions (such as firing 
scheduled punctuators).
+ * It simply captures any data it witnesses.
+ * If you require more automated tests, we recommend wrapping your {@link 
Processor} in a minimal source-processor-sink
+ * {@link Topology} and using the {@link TopologyTestDriver}.
+ */
+public class MockProcessorContext<KForward, VForward> implements 
ProcessorContext<KForward, VForward>, RecordCollector.Supplier {
+    // Immutable fields ================================================
+    private final StreamsMetricsImpl metrics;
+    private final TaskId taskId;
+    private final StreamsConfig config;
+    private final File stateDir;
+
+    // settable record metadata 
================================================
+    private String topic;
+    private Integer partition;
+    private Long offset;
+    private Headers headers;
+    private Long timestamp;
+
+    // mocks ================================================
+    private final Map<String, StateStore> stateStores = new HashMap<>();
+    private final List<CapturedPunctuator> punctuators = new LinkedList<>();
+    private final List<CapturedForward<KForward, VForward>> capturedForwards = 
new LinkedList<>();
+    private boolean committed = false;
+
+
+    /**
+     * {@link CapturedPunctuator} holds captured punctuators, along with their 
scheduling information.
+     */
+    public static final class CapturedPunctuator {
+        private final long intervalMs;
+        private final PunctuationType type;
+        private final Punctuator punctuator;
+        private boolean cancelled = false;
+
+        private CapturedPunctuator(final long intervalMs, final 
PunctuationType type, final Punctuator punctuator) {
+            this.intervalMs = intervalMs;
+            this.type = type;
+            this.punctuator = punctuator;
+        }
+
+        @SuppressWarnings("unused")
+        public long getIntervalMs() {
+            return intervalMs;
+        }
+
+        @SuppressWarnings("unused")
+        public PunctuationType getType() {
+            return type;
+        }
+
+        @SuppressWarnings("unused")
+        public Punctuator getPunctuator() {
+            return punctuator;
+        }
+
+        @SuppressWarnings({"WeakerAccess", "unused"})
+        public void cancel() {
+            cancelled = true;
+        }
+
+        @SuppressWarnings("unused")
+        public boolean cancelled() {
+            return cancelled;
+        }
+    }
+
+    public static final class CapturedForward<KForward, VForward> {
+        private final String childName;
+        private final long timestamp;
+        private final KeyValue<KForward, VForward> keyValue;
+
+        private CapturedForward(final To to, final KeyValue<KForward, 
VForward> keyValue) {
+            if (keyValue == null) {
+                throw new IllegalArgumentException("keyValue can't be null");
+            }
+
+            try {
+                final Field field = To.class.getDeclaredField("childName");
+                field.setAccessible(true);
+                childName = (String) field.get(to);
+            } catch (final IllegalAccessException | NoSuchFieldException e) {
+                throw new RuntimeException(e);
+            }
+            timestamp = getTimestamp(to);
+
+            this.keyValue = keyValue;
+        }
+
+        /**
+         * The child this data was forwarded to.
+         *
+         * @return The child name, or {@code null} if it was broadcast.
+         */
+        @SuppressWarnings("unused")
+        public String childName() {
+            return childName;
+        }
+
+        /**
+         * The timestamp attached to the forwarded record.
+         *
+         * @return A timestamp, or {@code -1} if none was forwarded.
+         */
+        @SuppressWarnings("unused")
+        public long timestamp() {
+            return timestamp;
+        }
+
+        /**
+         * The data forwarded.
+         *
+         * @return A key/value pair. Not null.
+         */
+        @SuppressWarnings("unused")
+        public KeyValue<KForward, VForward> keyValue() {
+            return keyValue;
+        }
+
+        @Override
+        public String toString() {
+            return "CapturedForward{" +
+                "childName='" + childName + '\'' +
+                ", timestamp=" + timestamp +
+                ", keyValue=" + keyValue +
+                '}';
+        }
+    }
+
+    // constructors ================================================
+
+    /**
+     * Create a {@link MockProcessorContext} with dummy {@code config} and 
{@code taskId} and {@code null} {@code stateDir}.
+     * Most unit tests using this mock won't need to know the taskId,
+     * and most unit tests should be able to get by with the
+     * {@link InMemoryKeyValueStore}, so the stateDir won't matter.
+     */
+    @SuppressWarnings("unused")
+    public MockProcessorContext() {
+        this(
+            mkProperties(mkMap(
+                mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, ""),
+                mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "")
+            )),
+            new TaskId(0, 0),
+            null
+        );
+    }
+
+    /**
+     * Create a {@link MockProcessorContext} with dummy {@code taskId} and 
{@code null} {@code stateDir}.
+     * Most unit tests using this mock won't need to know the taskId,
+     * and most unit tests should be able to get by with the
+     * {@link InMemoryKeyValueStore}, so the stateDir won't matter.
+     *
+     * @param config a Properties object, used to configure the context and 
the processor.
+     */
+    @SuppressWarnings("unused")
+    public MockProcessorContext(final Properties config) {
+        this(config, new TaskId(0, 0), null);
+    }
+
+    /**
+     * Create a {@link MockProcessorContext} with a specified taskId and null 
stateDir.
+     *
+     * @param config   a {@link Properties} object, used to configure the 
context and the processor.
+     * @param taskId   a {@link TaskId}, which the context makes available via 
{@link MockProcessorContext#taskId()}.
+     * @param stateDir a {@link File}, which the context makes available viw 
{@link MockProcessorContext#stateDir()}.
+     */
+    @SuppressWarnings("unused")
+    public MockProcessorContext(final Properties config, final TaskId taskId, 
final File stateDir) {
+        final StreamsConfig streamsConfig = new 
ClientUtils.QuietStreamsConfig(config);
+        this.taskId = taskId;
+        this.config = streamsConfig;
+        this.stateDir = stateDir;
+        final MetricConfig metricConfig = new MetricConfig();
+        metricConfig.recordLevel(Sensor.RecordingLevel.DEBUG);
+        final String threadId = Thread.currentThread().getName();
+        metrics = new StreamsMetricsImpl(
+            new Metrics(metricConfig),
+            threadId,
+            
streamsConfig.getString(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG),
+            Time.SYSTEM
+        );
+        TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor(threadId, 
taskId.toString(), metrics);
+    }
+
+    @Override
+    public String applicationId() {
+        return config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
+    }
+
+    @Override
+    public TaskId taskId() {
+        return taskId;
+    }
+
+    @Override
+    public Map<String, Object> appConfigs() {
+        final Map<String, Object> combined = new HashMap<>();
+        combined.putAll(config.originals());
+        combined.putAll(config.values());
+        return combined;
+    }
+
+    @Override
+    public Map<String, Object> appConfigsWithPrefix(final String prefix) {
+        return config.originalsWithPrefix(prefix);
+    }
+
+    @Override
+    public Serde<?> keySerde() {
+        return config.defaultKeySerde();
+    }
+
+    @Override
+    public Serde<?> valueSerde() {
+        return config.defaultValueSerde();
+    }
+
+    @Override
+    public File stateDir() {
+        return stateDir;
+    }
+
+    @Override
+    public StreamsMetrics metrics() {
+        return metrics;
+    }
+
+    // settable record metadata 
================================================
+
+    /**
+     * The context exposes these metadata for use in the processor. Normally, 
they are set by the Kafka Streams framework,
+     * but for the purpose of driving unit tests, you can set them directly.
+     *
+     * @param topic     A topic name
+     * @param partition A partition number
+     * @param offset    A record offset
+     * @param timestamp A record timestamp
+     */
+    @SuppressWarnings("unused")
+    public void setRecordMetadata(final String topic,
+                                  final int partition,
+                                  final long offset,
+                                  final Headers headers,
+                                  final long timestamp) {
+        this.topic = topic;
+        this.partition = partition;
+        this.offset = offset;
+        this.headers = headers;
+        this.timestamp = timestamp;
+    }
+
+    /**
+     * The context exposes this metadata for use in the processor. Normally, 
they are set by the Kafka Streams framework,
+     * but for the purpose of driving unit tests, you can set it directly. 
Setting this attribute doesn't affect the others.
+     *
+     * @param topic A topic name
+     */
+    @SuppressWarnings("unused")
+    public void setTopic(final String topic) {
+        this.topic = topic;
+    }
+
+    /**
+     * The context exposes this metadata for use in the processor. Normally, 
they are set by the Kafka Streams framework,
+     * but for the purpose of driving unit tests, you can set it directly. 
Setting this attribute doesn't affect the others.
+     *
+     * @param partition A partition number
+     */
+    @SuppressWarnings("unused")
+    public void setPartition(final int partition) {
+        this.partition = partition;
+    }
+
+    /**
+     * The context exposes this metadata for use in the processor. Normally, 
they are set by the Kafka Streams framework,
+     * but for the purpose of driving unit tests, you can set it directly. 
Setting this attribute doesn't affect the others.
+     *
+     * @param offset A record offset
+     */
+    @SuppressWarnings("unused")
+    public void setOffset(final long offset) {
+        this.offset = offset;
+    }
+
+    /**
+     * The context exposes this metadata for use in the processor. Normally, 
they are set by the Kafka Streams framework,
+     * but for the purpose of driving unit tests, you can set it directly. 
Setting this attribute doesn't affect the others.
+     *
+     * @param headers Record headers
+     */
+    @SuppressWarnings("unused")
+    public void setHeaders(final Headers headers) {
+        this.headers = headers;
+    }
+
+    /**
+     * The context exposes this metadata for use in the processor. Normally, 
they are set by the Kafka Streams framework,
+     * but for the purpose of driving unit tests, you can set it directly. 
Setting this attribute doesn't affect the others.
+     *
+     * @param timestamp A record timestamp
+     */
+    @SuppressWarnings("unused")
+    public void setTimestamp(final long timestamp) {
+        this.timestamp = timestamp;
+    }
+
+    @Override
+    public String topic() {
+        if (topic == null) {
+            throw new IllegalStateException("Topic must be set before use via 
setRecordMetadata() or setTopic().");
+        }
+        return topic;
+    }
+
+    @Override
+    public int partition() {
+        if (partition == null) {
+            throw new IllegalStateException("Partition must be set before use 
via setRecordMetadata() or setPartition().");
+        }
+        return partition;
+    }
+
+    @Override
+    public long offset() {
+        if (offset == null) {
+            throw new IllegalStateException("Offset must be set before use via 
setRecordMetadata() or setOffset().");
+        }
+        return offset;
+    }
+
+    @Override
+    public Headers headers() {
+        return headers;
+    }
+
+    @Override
+    public long timestamp() {
+        if (timestamp == null) {
+            throw new IllegalStateException("Timestamp must be set before use 
via setRecordMetadata() or setTimestamp().");
+        }
+        return timestamp;
+    }
+
+    // mocks ================================================
+
+    @Override
+    public void register(final StateStore store,
+                         final StateRestoreCallback 
stateRestoreCallbackIsIgnoredInMock) {
+        stateStores.put(store.name(), store);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <S extends StateStore> S getStateStore(final String name) {
+        return (S) stateStores.get(name);
+    }
+
+    @Override
+    public Cancellable schedule(final Duration interval,
+                                final PunctuationType type,
+                                final Punctuator callback) {
+        final CapturedPunctuator capturedPunctuator =
+            new 
CapturedPunctuator(ApiUtils.validateMillisecondDuration(interval, "interval"), 
type, callback);
+
+        punctuators.add(capturedPunctuator);
+
+        return capturedPunctuator::cancel;
+    }
+
+    /**
+     * Get the punctuators scheduled so far. The returned list is not affected 
by subsequent calls to {@code schedule(...)}.
+     *
+     * @return A list of captured punctuators.
+     */
+    @SuppressWarnings("unused")
+    public List<CapturedPunctuator> scheduledPunctuators() {
+        return new LinkedList<>(punctuators);
+    }
+
+    @Override
+    public <K extends KForward, V extends VForward> void forward(final K key, 
final V value) {
+        forward(key, value, To.all());
+    }
+
+    @Override
+    public <K extends KForward, V extends VForward> void forward(final K key, 
final V value, final To to) {
+        capturedForwards.add(
+            new CapturedForward<>(
+                (getTimestamp(to)) == -1 ? to.withTimestamp(timestamp == null 
? -1 : timestamp) : to,
+                new KeyValue<>(key, value)
+            )
+        );
+    }
+
+    /**
+     * Get all the forwarded data this context has observed. The returned list 
will not be
+     * affected by subsequent interactions with the context. The data in the 
list is in the same order as the calls to
+     * {@code forward(...)}.
+     *
+     * @return A list of key/value pairs that were previously passed to the 
context.
+     */
+    public List<CapturedForward<KForward, VForward>> forwarded() {
+        return new LinkedList<>(capturedForwards);
+    }
+
+    /**
+     * Get all the forwarded data this context has observed for a specific 
child by name.
+     * The returned list will not be affected by subsequent interactions with 
the context.
+     * The data in the list is in the same order as the calls to {@code 
forward(...)}.
+     *
+     * @param childName The child name to retrieve forwards for
+     * @return A list of key/value pairs that were previously passed to the 
context.
+     */
+    @SuppressWarnings("unused")
+    public List<CapturedForward<KForward, VForward>> forwarded(final String 
childName) {
+        final LinkedList<CapturedForward<KForward, VForward>> result = new 
LinkedList<>();
+        for (final CapturedForward<KForward, VForward> capture : 
capturedForwards) {
+            if (capture.childName() == null || 
capture.childName().equals(childName)) {
+                result.add(capture);
+            }
+        }
+        return result;
+    }
+
+    /**
+     * Clear the captured forwarded data.
+     */
+    @SuppressWarnings("unused")
+    public void resetForwards() {
+        capturedForwards.clear();
+    }
+
+    @Override
+    public void commit() {
+        committed = true;
+    }
+
+    /**
+     * Whether {@link ProcessorContext#commit()} has been called in this 
context.
+     *
+     * @return {@code true} iff {@link ProcessorContext#commit()} has been 
called in this context since construction or reset.
+     */
+    public boolean committed() {
+        return committed;
+    }
+
+    /**
+     * Reset the commit capture to {@code false} (whether or not it was 
previously {@code true}).
+     */
+    @SuppressWarnings("unused")
+    public void resetCommit() {
+        committed = false;
+    }
+
+    @Override
+    public RecordCollector recordCollector() {
+        // This interface is assumed by state stores that add change-logging.
+        // Rather than risk a mysterious ClassCastException during unit tests, 
throw an explanatory exception.
+
+        throw new UnsupportedOperationException(
+            "MockProcessorContext does not provide record collection. " +
+                "For processor unit tests, use an in-memory state store with 
change-logging disabled. " +
+                "Alternatively, use the TopologyTestDriver for testing 
processor/store/topology integration."
+        );
+    }
+
+    /**
+     * Used to get a {@link StateStoreContext} for use with
+     * {@link StateStore#init(StateStoreContext, StateStore)}
+     * if you need to initialize a store for your tests.
+     * @return a {@link StateStoreContext} that delegates to this 
ProcessorContext.
+     */
+    public StateStoreContext getStateStoreContext() {

Review comment:
       Added this so that users can still just have one context that they can 
use for initializing their processors and stores.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreToProcessorContextAdapter.java
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.StreamsMetrics;
+import org.apache.kafka.streams.processor.Cancellable;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.processor.Punctuator;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.To;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.Map;
+
+public final class StoreToProcessorContextAdapter implements ProcessorContext {
+    private final StateStoreContext delegate;
+
+    public static ProcessorContext adapt(final StateStoreContext delegate) {
+        if (delegate instanceof ProcessorToStoreContextAdapter) {
+            return ((ProcessorToStoreContextAdapter) delegate).delegate();
+        } else {
+            return new StoreToProcessorContextAdapter(delegate);
+        }
+    }

Review comment:
       To let the new method delegate to the old one, we adapt the provided 
StateStoreContext to ProcessorContext.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/StateStoreContext.java
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.StreamsMetrics;
+import org.apache.kafka.streams.errors.StreamsException;
+
+import java.io.File;
+import java.util.Map;
+
+/**
+ * Processor context interface.
+ */
+public interface StateStoreContext {

Review comment:
       Here's the proposal for the new context. I've preserved all the "general 
context" members while dropping all the "record context" ones and anything else 
that seemed inappropriate for state stores to invoke.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
##########
@@ -61,7 +84,9 @@
      * @throws IllegalStateException If store gets registered after 
initialized is already finished
      * @throws StreamsException if the store's change log does not contain the 
partition
      */
-    void init(ProcessorContext context, StateStore root);
+    default void init(final StateStoreContext context, final StateStore root) {
+        init(StoreToProcessorContextAdapter.adapt(context), root);
+    }

Review comment:
       Proposing to add this new version of `init` since the old 
`ProcessorContext` is slated to be deprecated once KIP-478 is fully implemented.
   
   Rather than just accepting the whole new ProcessorContext, I'm proposing to 
add a new StateStoreContext to avoid leaking a bunch of methods that should 
really only be invoked in Processor implementations.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to