This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 392e49b MINOR: consolidate processor context for active/standby (#8669) 392e49b is described below commit 392e49b1eddd2fcd8d09177cc80abc8a51f5c991 Author: A. Sophie Blee-Goldman <sop...@confluent.io> AuthorDate: Mon May 18 14:50:54 2020 -0700 MINOR: consolidate processor context for active/standby (#8669) This is a prerequisite for KAFKA-9501 and will also be useful for KAFKA-9603 There should be no logical changes here: the main difference is the removal of StandbyContextImpl in preparation for contexts to transition between active and standby. Also includes some minor cleanup, eg pulling the ReadOnly/ReadWrite decorators out into a separate file. Reviewers: Bruno Cadonna <br...@confluent.io>, John Roesler <vvcep...@apache.org>, Guozhang Wang <wangg...@gmail.com> --- .../internals/AbstractProcessorContext.java | 10 +- .../internals/AbstractReadOnlyDecorator.java | 252 ++++++++++ .../internals/AbstractReadWriteDecorator.java | 248 ++++++++++ .../internals/GlobalProcessorContextImpl.java | 37 +- .../internals/GlobalStateManagerImpl.java | 7 +- .../internals/InternalProcessorContext.java | 19 +- .../processor/internals/ProcessorContextImpl.java | 548 +++++---------------- .../processor/internals/ProcessorStateManager.java | 4 +- .../processor/internals/StandbyContextImpl.java | 190 ------- .../streams/processor/internals/StandbyTask.java | 2 +- .../streams/processor/internals/StateManager.java | 4 +- .../state/internals/CachingKeyValueStore.java | 2 +- .../state/internals/CachingSessionStore.java | 2 +- .../state/internals/CachingWindowStore.java | 2 +- .../internals/ChangeLoggingKeyValueBytesStore.java | 14 +- .../internals/ChangeLoggingSessionBytesStore.java | 20 +- ...ChangeLoggingTimestampedKeyValueBytesStore.java | 4 +- .../ChangeLoggingTimestampedWindowBytesStore.java | 4 +- .../internals/ChangeLoggingWindowBytesStore.java | 17 +- .../streams/state/internals/StoreChangeLogger.java | 71 --- .../internals/AbstractProcessorContextTest.java | 12 +- .../internals/GlobalProcessorContextImplTest.java | 2 + .../internals/ProcessorContextImplTest.java | 203 +++++++- .../processor/internals/ProcessorContextTest.java | 9 +- .../processor/internals/StandbyTaskTest.java | 5 +- .../processor/internals/StateManagerStub.java | 6 + .../streams/state/KeyValueStoreTestDriver.java | 2 +- .../ChangeLoggingSessionBytesStoreTest.java | 29 +- ...angeLoggingTimestampedWindowBytesStoreTest.java | 37 +- .../ChangeLoggingWindowBytesStoreTest.java | 40 +- .../state/internals/StoreChangeLoggerTest.java | 84 ---- .../apache/kafka/test/GlobalStateManagerStub.java | 6 + .../kafka/test/InternalMockProcessorContext.java | 23 + .../kafka/test/MockInternalProcessorContext.java | 17 +- .../apache/kafka/test/NoOpProcessorContext.java | 17 +- .../apache/kafka/streams/TopologyTestDriver.java | 2 +- .../streams/processor/MockProcessorContext.java | 1 + 37 files changed, 1020 insertions(+), 932 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java index e684344..1132708 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java @@ -22,6 +22,7 @@ import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.processor.StateRestoreCallback; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.internals.Task.TaskType; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.state.internals.ThreadCache; @@ -30,7 +31,6 @@ import java.util.HashMap; import java.util.Map; import java.util.Objects; - public abstract class AbstractProcessorContext implements InternalProcessorContext { public static final String NONEXIST_TOPIC = "__null_topic__"; @@ -138,6 +138,7 @@ public abstract class AbstractProcessorContext implements InternalProcessorConte if (recordContext == null) { throw new IllegalStateException("This should not happen as partition() should only be called while a record is processed"); } + return recordContext.partition(); } @@ -205,7 +206,7 @@ public abstract class AbstractProcessorContext implements InternalProcessorConte } @Override - public ThreadCache getCache() { + public ThreadCache cache() { return cache; } @@ -218,4 +219,9 @@ public abstract class AbstractProcessorContext implements InternalProcessorConte public void uninitialize() { initialized = false; } + + @Override + public TaskType taskType() { + return stateManager.taskType(); + } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadOnlyDecorator.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadOnlyDecorator.java new file mode 100644 index 0000000..a63cd99 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadOnlyDecorator.java @@ -0,0 +1,252 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals; + +import java.util.List; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.SessionStore; +import org.apache.kafka.streams.state.TimestampedKeyValueStore; +import org.apache.kafka.streams.state.TimestampedWindowStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.apache.kafka.streams.state.WindowStore; +import org.apache.kafka.streams.state.WindowStoreIterator; +import org.apache.kafka.streams.state.internals.WrappedStateStore; + +abstract class AbstractReadOnlyDecorator<T extends StateStore, K, V> extends WrappedStateStore<T, K, V> { + + static final String ERROR_MESSAGE = "Global store is read only"; + + private AbstractReadOnlyDecorator(final T inner) { + super(inner); + } + + @Override + public void flush() { + throw new UnsupportedOperationException(ERROR_MESSAGE); + } + + @Override + public void init(final ProcessorContext context, + final StateStore root) { + throw new UnsupportedOperationException(ERROR_MESSAGE); + } + + @Override + public void close() { + throw new UnsupportedOperationException(ERROR_MESSAGE); + } + + static StateStore getReadOnlyStore(final StateStore global) { + if (global instanceof TimestampedKeyValueStore) { + return new TimestampedKeyValueStoreReadOnlyDecorator<>((TimestampedKeyValueStore<?, ?>) global); + } else if (global instanceof KeyValueStore) { + return new KeyValueStoreReadOnlyDecorator<>((KeyValueStore<?, ?>) global); + } else if (global instanceof TimestampedWindowStore) { + return new TimestampedWindowStoreReadOnlyDecorator<>((TimestampedWindowStore<?, ?>) global); + } else if (global instanceof WindowStore) { + return new WindowStoreReadOnlyDecorator<>((WindowStore<?, ?>) global); + } else if (global instanceof SessionStore) { + return new SessionStoreReadOnlyDecorator<>((SessionStore<?, ?>) global); + } else { + return global; + } + } + + static class KeyValueStoreReadOnlyDecorator<K, V> + extends AbstractReadOnlyDecorator<KeyValueStore<K, V>, K, V> + implements KeyValueStore<K, V> { + + private KeyValueStoreReadOnlyDecorator(final KeyValueStore<K, V> inner) { + super(inner); + } + + @Override + public V get(final K key) { + return wrapped().get(key); + } + + @Override + public KeyValueIterator<K, V> range(final K from, + final K to) { + return wrapped().range(from, to); + } + + @Override + public KeyValueIterator<K, V> all() { + return wrapped().all(); + } + + @Override + public long approximateNumEntries() { + return wrapped().approximateNumEntries(); + } + + @Override + public void put(final K key, + final V value) { + throw new UnsupportedOperationException(ERROR_MESSAGE); + } + + @Override + public V putIfAbsent(final K key, + final V value) { + throw new UnsupportedOperationException(ERROR_MESSAGE); + } + + @Override + public void putAll(final List<KeyValue<K, V>> entries) { + throw new UnsupportedOperationException(ERROR_MESSAGE); + } + + @Override + public V delete(final K key) { + throw new UnsupportedOperationException(ERROR_MESSAGE); + } + } + + static class TimestampedKeyValueStoreReadOnlyDecorator<K, V> + extends KeyValueStoreReadOnlyDecorator<K, ValueAndTimestamp<V>> + implements TimestampedKeyValueStore<K, V> { + + private TimestampedKeyValueStoreReadOnlyDecorator(final TimestampedKeyValueStore<K, V> inner) { + super(inner); + } + } + + static class WindowStoreReadOnlyDecorator<K, V> + extends AbstractReadOnlyDecorator<WindowStore<K, V>, K, V> + implements WindowStore<K, V> { + + private WindowStoreReadOnlyDecorator(final WindowStore<K, V> inner) { + super(inner); + } + + @Deprecated + @Override + public void put(final K key, + final V value) { + throw new UnsupportedOperationException(ERROR_MESSAGE); + } + + @Override + public void put(final K key, + final V value, + final long windowStartTimestamp) { + throw new UnsupportedOperationException(ERROR_MESSAGE); + } + + @Override + public V fetch(final K key, + final long time) { + return wrapped().fetch(key, time); + } + + @Override + @Deprecated + public WindowStoreIterator<V> fetch(final K key, + final long timeFrom, + final long timeTo) { + return wrapped().fetch(key, timeFrom, timeTo); + } + + @Override + @Deprecated + public KeyValueIterator<Windowed<K>, V> fetch(final K from, + final K to, + final long timeFrom, + final long timeTo) { + return wrapped().fetch(from, to, timeFrom, timeTo); + } + + @Override + public KeyValueIterator<Windowed<K>, V> all() { + return wrapped().all(); + } + + @Override + @Deprecated + public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom, + final long timeTo) { + return wrapped().fetchAll(timeFrom, timeTo); + } + } + + static class TimestampedWindowStoreReadOnlyDecorator<K, V> + extends WindowStoreReadOnlyDecorator<K, ValueAndTimestamp<V>> + implements TimestampedWindowStore<K, V> { + + private TimestampedWindowStoreReadOnlyDecorator(final TimestampedWindowStore<K, V> inner) { + super(inner); + } + } + + static class SessionStoreReadOnlyDecorator<K, AGG> + extends AbstractReadOnlyDecorator<SessionStore<K, AGG>, K, AGG> + implements SessionStore<K, AGG> { + + private SessionStoreReadOnlyDecorator(final SessionStore<K, AGG> inner) { + super(inner); + } + + @Override + public KeyValueIterator<Windowed<K>, AGG> findSessions(final K key, + final long earliestSessionEndTime, + final long latestSessionStartTime) { + return wrapped().findSessions(key, earliestSessionEndTime, latestSessionStartTime); + } + + @Override + public KeyValueIterator<Windowed<K>, AGG> findSessions(final K keyFrom, + final K keyTo, + final long earliestSessionEndTime, + final long latestSessionStartTime) { + return wrapped().findSessions(keyFrom, keyTo, earliestSessionEndTime, latestSessionStartTime); + } + + @Override + public void remove(final Windowed<K> sessionKey) { + throw new UnsupportedOperationException(ERROR_MESSAGE); + } + + @Override + public void put(final Windowed<K> sessionKey, + final AGG aggregate) { + throw new UnsupportedOperationException(ERROR_MESSAGE); + } + + @Override + public AGG fetchSession(final K key, final long startTime, final long endTime) { + return wrapped().fetchSession(key, startTime, endTime); + } + + @Override + public KeyValueIterator<Windowed<K>, AGG> fetch(final K key) { + return wrapped().fetch(key); + } + + @Override + public KeyValueIterator<Windowed<K>, AGG> fetch(final K from, + final K to) { + return wrapped().fetch(from, to); + } + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java new file mode 100644 index 0000000..494d98e --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals; + +import java.util.List; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.SessionStore; +import org.apache.kafka.streams.state.TimestampedKeyValueStore; +import org.apache.kafka.streams.state.TimestampedWindowStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.apache.kafka.streams.state.WindowStore; +import org.apache.kafka.streams.state.WindowStoreIterator; +import org.apache.kafka.streams.state.internals.WrappedStateStore; + +abstract class AbstractReadWriteDecorator<T extends StateStore, K, V> extends WrappedStateStore<T, K, V> { + static final String ERROR_MESSAGE = "This method may only be called by Kafka Streams"; + + private AbstractReadWriteDecorator(final T inner) { + super(inner); + } + + @Override + public void init(final ProcessorContext context, + final StateStore root) { + throw new UnsupportedOperationException(ERROR_MESSAGE); + } + + @Override + public void close() { + throw new UnsupportedOperationException(ERROR_MESSAGE); + } + + static StateStore getReadWriteStore(final StateStore store) { + if (store instanceof TimestampedKeyValueStore) { + return new TimestampedKeyValueStoreReadWriteDecorator<>((TimestampedKeyValueStore<?, ?>) store); + } else if (store instanceof KeyValueStore) { + return new KeyValueStoreReadWriteDecorator<>((KeyValueStore<?, ?>) store); + } else if (store instanceof TimestampedWindowStore) { + return new TimestampedWindowStoreReadWriteDecorator<>((TimestampedWindowStore<?, ?>) store); + } else if (store instanceof WindowStore) { + return new WindowStoreReadWriteDecorator<>((WindowStore<?, ?>) store); + } else if (store instanceof SessionStore) { + return new SessionStoreReadWriteDecorator<>((SessionStore<?, ?>) store); + } else { + return store; + } + } + + static class KeyValueStoreReadWriteDecorator<K, V> + extends AbstractReadWriteDecorator<KeyValueStore<K, V>, K, V> + implements KeyValueStore<K, V> { + + KeyValueStoreReadWriteDecorator(final KeyValueStore<K, V> inner) { + super(inner); + } + + @Override + public V get(final K key) { + return wrapped().get(key); + } + + @Override + public KeyValueIterator<K, V> range(final K from, + final K to) { + return wrapped().range(from, to); + } + + @Override + public KeyValueIterator<K, V> all() { + return wrapped().all(); + } + + @Override + public long approximateNumEntries() { + return wrapped().approximateNumEntries(); + } + + @Override + public void put(final K key, + final V value) { + wrapped().put(key, value); + } + + @Override + public V putIfAbsent(final K key, + final V value) { + return wrapped().putIfAbsent(key, value); + } + + @Override + public void putAll(final List<KeyValue<K, V>> entries) { + wrapped().putAll(entries); + } + + @Override + public V delete(final K key) { + return wrapped().delete(key); + } + } + + static class TimestampedKeyValueStoreReadWriteDecorator<K, V> + extends KeyValueStoreReadWriteDecorator<K, ValueAndTimestamp<V>> + implements TimestampedKeyValueStore<K, V> { + + TimestampedKeyValueStoreReadWriteDecorator(final TimestampedKeyValueStore<K, V> inner) { + super(inner); + } + } + + static class WindowStoreReadWriteDecorator<K, V> + extends AbstractReadWriteDecorator<WindowStore<K, V>, K, V> + implements WindowStore<K, V> { + + WindowStoreReadWriteDecorator(final WindowStore<K, V> inner) { + super(inner); + } + + @Deprecated + @Override + public void put(final K key, + final V value) { + wrapped().put(key, value); + } + + @Override + public void put(final K key, + final V value, + final long windowStartTimestamp) { + wrapped().put(key, value, windowStartTimestamp); + } + + @Override + public V fetch(final K key, + final long time) { + return wrapped().fetch(key, time); + } + + @SuppressWarnings("deprecation") // note, this method must be kept if super#fetch(...) is removed + @Override + public WindowStoreIterator<V> fetch(final K key, + final long timeFrom, + final long timeTo) { + return wrapped().fetch(key, timeFrom, timeTo); + } + + @SuppressWarnings("deprecation") // note, this method must be kept if super#fetch(...) is removed + @Override + public KeyValueIterator<Windowed<K>, V> fetch(final K from, + final K to, + final long timeFrom, + final long timeTo) { + return wrapped().fetch(from, to, timeFrom, timeTo); + } + + @SuppressWarnings("deprecation") // note, this method must be kept if super#fetch(...) is removed + @Override + public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom, + final long timeTo) { + return wrapped().fetchAll(timeFrom, timeTo); + } + + @Override + public KeyValueIterator<Windowed<K>, V> all() { + return wrapped().all(); + } + } + + static class TimestampedWindowStoreReadWriteDecorator<K, V> + extends WindowStoreReadWriteDecorator<K, ValueAndTimestamp<V>> + implements TimestampedWindowStore<K, V> { + + TimestampedWindowStoreReadWriteDecorator(final TimestampedWindowStore<K, V> inner) { + super(inner); + } + } + + static class SessionStoreReadWriteDecorator<K, AGG> + extends AbstractReadWriteDecorator<SessionStore<K, AGG>, K, AGG> + implements SessionStore<K, AGG> { + + SessionStoreReadWriteDecorator(final SessionStore<K, AGG> inner) { + super(inner); + } + + @Override + public KeyValueIterator<Windowed<K>, AGG> findSessions(final K key, + final long earliestSessionEndTime, + final long latestSessionStartTime) { + return wrapped().findSessions(key, earliestSessionEndTime, latestSessionStartTime); + } + + @Override + public KeyValueIterator<Windowed<K>, AGG> findSessions(final K keyFrom, + final K keyTo, + final long earliestSessionEndTime, + final long latestSessionStartTime) { + return wrapped().findSessions(keyFrom, keyTo, earliestSessionEndTime, latestSessionStartTime); + } + + @Override + public void remove(final Windowed<K> sessionKey) { + wrapped().remove(sessionKey); + } + + @Override + public void put(final Windowed<K> sessionKey, + final AGG aggregate) { + wrapped().put(sessionKey, aggregate); + } + + @Override + public AGG fetchSession(final K key, + final long startTime, + final long endTime) { + return wrapped().fetchSession(key, startTime, endTime); + } + + @Override + public KeyValueIterator<Windowed<K>, AGG> fetch(final K key) { + return wrapped().fetch(key); + } + + @Override + public KeyValueIterator<Windowed<K>, AGG> fetch(final K from, + final K to) { + return wrapped().fetch(from, to); + } + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java index 859430c..81169d3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java @@ -16,6 +16,9 @@ */ package org.apache.kafka.streams.processor.internals; +import static org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator.getReadWriteStore; + +import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.processor.Cancellable; import org.apache.kafka.streams.processor.PunctuationType; @@ -23,24 +26,13 @@ import org.apache.kafka.streams.processor.Punctuator; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.To; -import org.apache.kafka.streams.processor.internals.ProcessorContextImpl.KeyValueStoreReadWriteDecorator; -import org.apache.kafka.streams.processor.internals.ProcessorContextImpl.SessionStoreReadWriteDecorator; -import org.apache.kafka.streams.processor.internals.ProcessorContextImpl.TimestampedKeyValueStoreReadWriteDecorator; -import org.apache.kafka.streams.processor.internals.ProcessorContextImpl.TimestampedWindowStoreReadWriteDecorator; -import org.apache.kafka.streams.processor.internals.ProcessorContextImpl.WindowStoreReadWriteDecorator; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; -import org.apache.kafka.streams.state.KeyValueStore; -import org.apache.kafka.streams.state.SessionStore; -import org.apache.kafka.streams.state.TimestampedKeyValueStore; -import org.apache.kafka.streams.state.TimestampedWindowStore; -import org.apache.kafka.streams.state.WindowStore; import org.apache.kafka.streams.state.internals.ThreadCache; import java.time.Duration; public class GlobalProcessorContextImpl extends AbstractProcessorContext { - public GlobalProcessorContextImpl(final StreamsConfig config, final StateManager stateMgr, final StreamsMetricsImpl metrics, @@ -51,20 +43,7 @@ public class GlobalProcessorContextImpl extends AbstractProcessorContext { @Override public StateStore getStateStore(final String name) { final StateStore store = stateManager.getGlobalStore(name); - - if (store instanceof TimestampedKeyValueStore) { - return new TimestampedKeyValueStoreReadWriteDecorator<>((TimestampedKeyValueStore<?, ?>) store); - } else if (store instanceof KeyValueStore) { - return new KeyValueStoreReadWriteDecorator<>((KeyValueStore<?, ?>) store); - } else if (store instanceof TimestampedWindowStore) { - return new TimestampedWindowStoreReadWriteDecorator<>((TimestampedWindowStore<?, ?>) store); - } else if (store instanceof WindowStore) { - return new WindowStoreReadWriteDecorator<>((WindowStore<?, ?>) store); - } else if (store instanceof SessionStore) { - return new SessionStoreReadWriteDecorator<>((SessionStore<?, ?>) store); - } - - return store; + return getReadWriteStore(store); } @SuppressWarnings("unchecked") @@ -130,4 +109,12 @@ public class GlobalProcessorContextImpl extends AbstractProcessorContext { public Cancellable schedule(final Duration interval, final PunctuationType type, final Punctuator callback) { throw new UnsupportedOperationException("this should not happen: schedule() not supported in global processor context."); } + + @Override + public void logChange(final String storeName, + final Bytes key, + final byte[] value, + final long timestamp) { + throw new UnsupportedOperationException("this should not happen: logChange() not supported in global processor context."); + } } \ No newline at end of file diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java index f17131d..1def55c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java @@ -33,6 +33,7 @@ import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.StateRestoreCallback; import org.apache.kafka.streams.processor.StateRestoreListener; import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.internals.Task.TaskType; import org.apache.kafka.streams.state.internals.OffsetCheckpoint; import org.apache.kafka.streams.state.internals.RecordConverter; import org.slf4j.Logger; @@ -222,7 +223,6 @@ public class GlobalStateManagerImpl implements GlobalStateManager { } finally { globalConsumer.unsubscribe(); } - } private List<TopicPartition> topicPartitionsForStore(final StateStore store) { @@ -339,7 +339,6 @@ public class GlobalStateManagerImpl implements GlobalStateManager { } } - @Override public void close() throws IOException { try { @@ -394,6 +393,10 @@ public class GlobalStateManagerImpl implements GlobalStateManager { } } + @Override + public TaskType taskType() { + return TaskType.GLOBAL; + } @Override public Map<TopicPartition, Long> changelogOffsets() { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java index 145e889..db5cfc9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java @@ -16,9 +16,13 @@ */ package org.apache.kafka.streams.processor.internals; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.serialization.BytesSerializer; +import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.RecordContext; import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.internals.Task.TaskType; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.internals.ThreadCache; @@ -29,6 +33,8 @@ import org.apache.kafka.streams.state.internals.ThreadCache; * {@link ThreadCache} */ public interface InternalProcessorContext extends ProcessorContext { + BytesSerializer BYTES_KEY_SERIALIZER = new BytesSerializer(); + ByteArraySerializer BYTEARRAY_VALUE_SERIALIZER = new ByteArraySerializer(); @Override StreamsMetricsImpl metrics(); @@ -67,7 +73,7 @@ public interface InternalProcessorContext extends ProcessorContext { /** * Get the thread-global cache */ - ThreadCache getCache(); + ThreadCache cache(); /** * Mark this context as being initialized @@ -80,10 +86,21 @@ public interface InternalProcessorContext extends ProcessorContext { void uninitialize(); /** + * @return the type of task (active/standby/global) that this context corresponds to + */ + TaskType taskType(); + + /** * Get a correctly typed state store, given a handle on the original builder. */ @SuppressWarnings("unchecked") default <T extends StateStore> T getStateStore(final StoreBuilder<T> builder) { return (T) getStateStore(builder.name()); } + + void logChange(final String storeName, + final Bytes key, + final byte[] value, + final long timestamp); + } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java index d390af5..c776367 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java @@ -16,87 +16,124 @@ */ package org.apache.kafka.streams.processor.internals; -import org.apache.kafka.streams.KeyValue; +import java.util.HashMap; +import java.util.Map; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.internals.ApiUtils; -import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.processor.Cancellable; -import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.PunctuationType; import org.apache.kafka.streams.processor.Punctuator; +import org.apache.kafka.streams.processor.StateRestoreCallback; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.To; +import org.apache.kafka.streams.processor.internals.Task.TaskType; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; -import org.apache.kafka.streams.state.KeyValueIterator; -import org.apache.kafka.streams.state.KeyValueStore; -import org.apache.kafka.streams.state.SessionStore; -import org.apache.kafka.streams.state.TimestampedKeyValueStore; -import org.apache.kafka.streams.state.TimestampedWindowStore; -import org.apache.kafka.streams.state.ValueAndTimestamp; -import org.apache.kafka.streams.state.WindowStore; -import org.apache.kafka.streams.state.WindowStoreIterator; import org.apache.kafka.streams.state.internals.ThreadCache; -import org.apache.kafka.streams.state.internals.WrappedStateStore; import java.time.Duration; import java.util.List; import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix; +import static org.apache.kafka.streams.processor.internals.AbstractReadOnlyDecorator.getReadOnlyStore; +import static org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator.getReadWriteStore; public class ProcessorContextImpl extends AbstractProcessorContext implements RecordCollector.Supplier { - - private final StreamTask task; + // The below are both null for standby tasks + private final StreamTask streamTask; private final RecordCollector collector; + private final ToInternal toInternal = new ToInternal(); private final static To SEND_TO_ALL = To.all(); + final Map<String, String> storeToChangelogTopic = new HashMap<>(); + ProcessorContextImpl(final TaskId id, - final StreamTask task, + final StreamTask streamTask, final StreamsConfig config, final RecordCollector collector, final ProcessorStateManager stateMgr, final StreamsMetricsImpl metrics, final ThreadCache cache) { super(id, config, metrics, stateMgr, cache); - this.task = task; + this.streamTask = streamTask; this.collector = collector; + + if (streamTask == null && taskType() == TaskType.ACTIVE) { + throw new IllegalStateException("Tried to create context for active task but the streamtask was null"); + } + } + + ProcessorContextImpl(final TaskId id, + final StreamsConfig config, + final ProcessorStateManager stateMgr, + final StreamsMetricsImpl metrics) { + this( + id, + null, + config, + null, + stateMgr, + metrics, + new ThreadCache( + new LogContext(String.format("stream-thread [%s] ", Thread.currentThread().getName())), + 0, + metrics + ) + ); } - public ProcessorStateManager getStateMgr() { + public ProcessorStateManager stateManager() { return (ProcessorStateManager) stateManager; } @Override + public void register(final StateStore store, + final StateRestoreCallback stateRestoreCallback) { + storeToChangelogTopic.put(store.name(), ProcessorStateManager.storeChangelogTopic(applicationId(), store.name())); + super.register(store, stateRestoreCallback); + } + + @Override public RecordCollector recordCollector() { return collector; } + @Override + public void logChange(final String storeName, + final Bytes key, + final byte[] value, + final long timestamp) { + throwUnsupportedOperationExceptionIfStandby("logChange"); + // Sending null headers to changelog topics (KIP-244) + collector.send( + storeToChangelogTopic.get(storeName), + key, + value, + null, + taskId().partition, + timestamp, + BYTES_KEY_SERIALIZER, + BYTEARRAY_VALUE_SERIALIZER); + } + /** * @throws StreamsException if an attempt is made to access this state store from an unknown node + * @throws UnsupportedOperationException if the current streamTask type is standby */ @Override public StateStore getStateStore(final String name) { + throwUnsupportedOperationExceptionIfStandby("getStateStore"); if (currentNode() == null) { throw new StreamsException("Accessing from an unknown node"); } - final StateStore global = stateManager.getGlobalStore(name); - if (global != null) { - if (global instanceof TimestampedKeyValueStore) { - return new TimestampedKeyValueStoreReadOnlyDecorator<>((TimestampedKeyValueStore<?, ?>) global); - } else if (global instanceof KeyValueStore) { - return new KeyValueStoreReadOnlyDecorator<>((KeyValueStore<?, ?>) global); - } else if (global instanceof TimestampedWindowStore) { - return new TimestampedWindowStoreReadOnlyDecorator<>((TimestampedWindowStore<?, ?>) global); - } else if (global instanceof WindowStore) { - return new WindowStoreReadOnlyDecorator<>((WindowStore<?, ?>) global); - } else if (global instanceof SessionStore) { - return new SessionStoreReadOnlyDecorator<>((SessionStore<?, ?>) global); - } - - return global; + final StateStore globalStore = stateManager.getGlobalStore(name); + if (globalStore != null) { + return getReadOnlyStore(globalStore); } if (!currentNode().stateStores.contains(name)) { @@ -110,24 +147,13 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re } final StateStore store = stateManager.getStore(name); - if (store instanceof TimestampedKeyValueStore) { - return new TimestampedKeyValueStoreReadWriteDecorator<>((TimestampedKeyValueStore<?, ?>) store); - } else if (store instanceof KeyValueStore) { - return new KeyValueStoreReadWriteDecorator<>((KeyValueStore<?, ?>) store); - } else if (store instanceof TimestampedWindowStore) { - return new TimestampedWindowStoreReadWriteDecorator<>((TimestampedWindowStore<?, ?>) store); - } else if (store instanceof WindowStore) { - return new WindowStoreReadWriteDecorator<>((WindowStore<?, ?>) store); - } else if (store instanceof SessionStore) { - return new SessionStoreReadWriteDecorator<>((SessionStore<?, ?>) store); - } - - return store; + return getReadWriteStore(store); } @Override public <K, V> void forward(final K key, final V value) { + throwUnsupportedOperationExceptionIfStandby("forward"); forward(key, value, SEND_TO_ALL); } @@ -136,6 +162,7 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re public <K, V> void forward(final K key, final V value, final int childIndex) { + throwUnsupportedOperationExceptionIfStandby("forward"); forward( key, value, @@ -147,6 +174,7 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re public <K, V> void forward(final K key, final V value, final String childName) { + throwUnsupportedOperationExceptionIfStandby("forward"); forward(key, value, To.child(childName)); } @@ -155,6 +183,7 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re public <K, V> void forward(final K key, final V value, final To to) { + throwUnsupportedOperationExceptionIfStandby("forward"); final ProcessorNode<?, ?> previousNode = currentNode(); final ProcessorRecordContext previousContext = recordContext; @@ -198,7 +227,8 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re @Override public void commit() { - task.requestCommit(); + throwUnsupportedOperationExceptionIfStandby("commit"); + streamTask.requestCommit(); } @Override @@ -206,10 +236,11 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re public Cancellable schedule(final long intervalMs, final PunctuationType type, final Punctuator callback) { + throwUnsupportedOperationExceptionIfStandby("schedule"); if (intervalMs < 1) { throw new IllegalArgumentException("The minimum supported scheduling interval is 1 millisecond."); } - return task.schedule(intervalMs, type, callback); + return streamTask.schedule(intervalMs, type, callback); } @SuppressWarnings("deprecation") // removing #schedule(final long intervalMs,...) will fix this @@ -217,414 +248,57 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re public Cancellable schedule(final Duration interval, final PunctuationType type, final Punctuator callback) throws IllegalArgumentException { + throwUnsupportedOperationExceptionIfStandby("schedule"); final String msgPrefix = prepareMillisCheckFailMsgPrefix(interval, "interval"); return schedule(ApiUtils.validateMillisecondDuration(interval, msgPrefix), type, callback); } - private abstract static class StateStoreReadOnlyDecorator<T extends StateStore, K, V> - extends WrappedStateStore<T, K, V> { - - static final String ERROR_MESSAGE = "Global store is read only"; - - private StateStoreReadOnlyDecorator(final T inner) { - super(inner); - } - - @Override - public void flush() { - throw new UnsupportedOperationException(ERROR_MESSAGE); - } - - @Override - public void init(final ProcessorContext context, - final StateStore root) { - throw new UnsupportedOperationException(ERROR_MESSAGE); - } - - @Override - public void close() { - throw new UnsupportedOperationException(ERROR_MESSAGE); - } - } - - private static class KeyValueStoreReadOnlyDecorator<K, V> - extends StateStoreReadOnlyDecorator<KeyValueStore<K, V>, K, V> - implements KeyValueStore<K, V> { - - private KeyValueStoreReadOnlyDecorator(final KeyValueStore<K, V> inner) { - super(inner); - } - - @Override - public V get(final K key) { - return wrapped().get(key); - } - - @Override - public KeyValueIterator<K, V> range(final K from, - final K to) { - return wrapped().range(from, to); - } - - @Override - public KeyValueIterator<K, V> all() { - return wrapped().all(); - } - - @Override - public long approximateNumEntries() { - return wrapped().approximateNumEntries(); - } - - @Override - public void put(final K key, - final V value) { - throw new UnsupportedOperationException(ERROR_MESSAGE); - } - - @Override - public V putIfAbsent(final K key, - final V value) { - throw new UnsupportedOperationException(ERROR_MESSAGE); - } - - @Override - public void putAll(final List<KeyValue<K, V>> entries) { - throw new UnsupportedOperationException(ERROR_MESSAGE); - } - - @Override - public V delete(final K key) { - throw new UnsupportedOperationException(ERROR_MESSAGE); - } - } - - private static class TimestampedKeyValueStoreReadOnlyDecorator<K, V> - extends KeyValueStoreReadOnlyDecorator<K, ValueAndTimestamp<V>> - implements TimestampedKeyValueStore<K, V> { - - private TimestampedKeyValueStoreReadOnlyDecorator(final TimestampedKeyValueStore<K, V> inner) { - super(inner); - } - } - - private static class WindowStoreReadOnlyDecorator<K, V> - extends StateStoreReadOnlyDecorator<WindowStore<K, V>, K, V> - implements WindowStore<K, V> { - - private WindowStoreReadOnlyDecorator(final WindowStore<K, V> inner) { - super(inner); - } - - @Deprecated - @Override - public void put(final K key, - final V value) { - throw new UnsupportedOperationException(ERROR_MESSAGE); - } - - @Override - public void put(final K key, - final V value, - final long windowStartTimestamp) { - throw new UnsupportedOperationException(ERROR_MESSAGE); - } - - @Override - public V fetch(final K key, - final long time) { - return wrapped().fetch(key, time); - } - - @Override - @Deprecated - public WindowStoreIterator<V> fetch(final K key, - final long timeFrom, - final long timeTo) { - return wrapped().fetch(key, timeFrom, timeTo); - } - - @Override - @Deprecated - public KeyValueIterator<Windowed<K>, V> fetch(final K from, - final K to, - final long timeFrom, - final long timeTo) { - return wrapped().fetch(from, to, timeFrom, timeTo); - } - - @Override - public KeyValueIterator<Windowed<K>, V> all() { - return wrapped().all(); - } - - @Override - @Deprecated - public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom, - final long timeTo) { - return wrapped().fetchAll(timeFrom, timeTo); - } - } - - private static class TimestampedWindowStoreReadOnlyDecorator<K, V> - extends WindowStoreReadOnlyDecorator<K, ValueAndTimestamp<V>> - implements TimestampedWindowStore<K, V> { - - private TimestampedWindowStoreReadOnlyDecorator(final TimestampedWindowStore<K, V> inner) { - super(inner); - } + @Override + public String topic() { + throwUnsupportedOperationExceptionIfStandby("topic"); + return super.topic(); } - private static class SessionStoreReadOnlyDecorator<K, AGG> - extends StateStoreReadOnlyDecorator<SessionStore<K, AGG>, K, AGG> - implements SessionStore<K, AGG> { - - private SessionStoreReadOnlyDecorator(final SessionStore<K, AGG> inner) { - super(inner); - } - - @Override - public KeyValueIterator<Windowed<K>, AGG> findSessions(final K key, - final long earliestSessionEndTime, - final long latestSessionStartTime) { - return wrapped().findSessions(key, earliestSessionEndTime, latestSessionStartTime); - } - - @Override - public KeyValueIterator<Windowed<K>, AGG> findSessions(final K keyFrom, - final K keyTo, - final long earliestSessionEndTime, - final long latestSessionStartTime) { - return wrapped().findSessions(keyFrom, keyTo, earliestSessionEndTime, latestSessionStartTime); - } - - @Override - public void remove(final Windowed<K> sessionKey) { - throw new UnsupportedOperationException(ERROR_MESSAGE); - } - - @Override - public void put(final Windowed<K> sessionKey, - final AGG aggregate) { - throw new UnsupportedOperationException(ERROR_MESSAGE); - } - - @Override - public AGG fetchSession(final K key, final long startTime, final long endTime) { - return wrapped().fetchSession(key, startTime, endTime); - } - - @Override - public KeyValueIterator<Windowed<K>, AGG> fetch(final K key) { - return wrapped().fetch(key); - } - - @Override - public KeyValueIterator<Windowed<K>, AGG> fetch(final K from, - final K to) { - return wrapped().fetch(from, to); - } + @Override + public int partition() { + throwUnsupportedOperationExceptionIfStandby("partition"); + return super.partition(); } - private abstract static class StateStoreReadWriteDecorator<T extends StateStore, K, V> - extends WrappedStateStore<T, K, V> { - - static final String ERROR_MESSAGE = "This method may only be called by Kafka Streams"; - - private StateStoreReadWriteDecorator(final T inner) { - super(inner); - } - - @Override - public void init(final ProcessorContext context, - final StateStore root) { - throw new UnsupportedOperationException(ERROR_MESSAGE); - } - - @Override - public void close() { - throw new UnsupportedOperationException(ERROR_MESSAGE); - } + @Override + public long offset() { + throwUnsupportedOperationExceptionIfStandby("offset"); + return super.offset(); } - static class KeyValueStoreReadWriteDecorator<K, V> - extends StateStoreReadWriteDecorator<KeyValueStore<K, V>, K, V> - implements KeyValueStore<K, V> { - - KeyValueStoreReadWriteDecorator(final KeyValueStore<K, V> inner) { - super(inner); - } - - @Override - public V get(final K key) { - return wrapped().get(key); - } - - @Override - public KeyValueIterator<K, V> range(final K from, - final K to) { - return wrapped().range(from, to); - } - - @Override - public KeyValueIterator<K, V> all() { - return wrapped().all(); - } - - @Override - public long approximateNumEntries() { - return wrapped().approximateNumEntries(); - } - - @Override - public void put(final K key, - final V value) { - wrapped().put(key, value); - } - - @Override - public V putIfAbsent(final K key, - final V value) { - return wrapped().putIfAbsent(key, value); - } - - @Override - public void putAll(final List<KeyValue<K, V>> entries) { - wrapped().putAll(entries); - } - - @Override - public V delete(final K key) { - return wrapped().delete(key); - } + @Override + public long timestamp() { + throwUnsupportedOperationExceptionIfStandby("timestamp"); + return super.timestamp(); } - static class TimestampedKeyValueStoreReadWriteDecorator<K, V> - extends KeyValueStoreReadWriteDecorator<K, ValueAndTimestamp<V>> - implements TimestampedKeyValueStore<K, V> { - - TimestampedKeyValueStoreReadWriteDecorator(final TimestampedKeyValueStore<K, V> inner) { - super(inner); - } + @Override + public ProcessorNode<?, ?> currentNode() { + throwUnsupportedOperationExceptionIfStandby("currentNode"); + return super.currentNode(); } - static class WindowStoreReadWriteDecorator<K, V> - extends StateStoreReadWriteDecorator<WindowStore<K, V>, K, V> - implements WindowStore<K, V> { - - WindowStoreReadWriteDecorator(final WindowStore<K, V> inner) { - super(inner); - } - - @Deprecated - @Override - public void put(final K key, - final V value) { - wrapped().put(key, value); - } - - @Override - public void put(final K key, - final V value, - final long windowStartTimestamp) { - wrapped().put(key, value, windowStartTimestamp); - } - - @Override - public V fetch(final K key, - final long time) { - return wrapped().fetch(key, time); - } - - @SuppressWarnings("deprecation") // note, this method must be kept if super#fetch(...) is removed - @Override - public WindowStoreIterator<V> fetch(final K key, - final long timeFrom, - final long timeTo) { - return wrapped().fetch(key, timeFrom, timeTo); - } - - @SuppressWarnings("deprecation") // note, this method must be kept if super#fetch(...) is removed - @Override - public KeyValueIterator<Windowed<K>, V> fetch(final K from, - final K to, - final long timeFrom, - final long timeTo) { - return wrapped().fetch(from, to, timeFrom, timeTo); - } - - @SuppressWarnings("deprecation") // note, this method must be kept if super#fetch(...) is removed - @Override - public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom, - final long timeTo) { - return wrapped().fetchAll(timeFrom, timeTo); - } - - @Override - public KeyValueIterator<Windowed<K>, V> all() { - return wrapped().all(); - } + @Override + public void setRecordContext(final ProcessorRecordContext recordContext) { + throwUnsupportedOperationExceptionIfStandby("setRecordContext"); + super.setRecordContext(recordContext); } - static class TimestampedWindowStoreReadWriteDecorator<K, V> - extends WindowStoreReadWriteDecorator<K, ValueAndTimestamp<V>> - implements TimestampedWindowStore<K, V> { - - TimestampedWindowStoreReadWriteDecorator(final TimestampedWindowStore<K, V> inner) { - super(inner); - } + @Override + public ProcessorRecordContext recordContext() { + throwUnsupportedOperationExceptionIfStandby("recordContext"); + return super.recordContext(); } - static class SessionStoreReadWriteDecorator<K, AGG> - extends StateStoreReadWriteDecorator<SessionStore<K, AGG>, K, AGG> - implements SessionStore<K, AGG> { - - SessionStoreReadWriteDecorator(final SessionStore<K, AGG> inner) { - super(inner); - } - - @Override - public KeyValueIterator<Windowed<K>, AGG> findSessions(final K key, - final long earliestSessionEndTime, - final long latestSessionStartTime) { - return wrapped().findSessions(key, earliestSessionEndTime, latestSessionStartTime); - } - - @Override - public KeyValueIterator<Windowed<K>, AGG> findSessions(final K keyFrom, - final K keyTo, - final long earliestSessionEndTime, - final long latestSessionStartTime) { - return wrapped().findSessions(keyFrom, keyTo, earliestSessionEndTime, latestSessionStartTime); - } - - @Override - public void remove(final Windowed<K> sessionKey) { - wrapped().remove(sessionKey); - } - - @Override - public void put(final Windowed<K> sessionKey, - final AGG aggregate) { - wrapped().put(sessionKey, aggregate); - } - - @Override - public AGG fetchSession(final K key, - final long startTime, - final long endTime) { - return wrapped().fetchSession(key, startTime, endTime); - } - - @Override - public KeyValueIterator<Windowed<K>, AGG> fetch(final K key) { - return wrapped().fetch(key); - } - - @Override - public KeyValueIterator<Windowed<K>, AGG> fetch(final K from, - final K to) { - return wrapped().fetch(from, to); + private void throwUnsupportedOperationExceptionIfStandby(final String operationName) { + if (taskType() == TaskType.STANDBY) { + throw new UnsupportedOperationException( + "this should not happen: " + operationName + "() is not supported in standby tasks."); } } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java index a3ab881..f7c1936 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java @@ -345,8 +345,8 @@ public class ProcessorStateManager implements StateManager { return sourcePartitions.contains(partition); } - // used by the changelog reader only - TaskType taskType() { + @Override + public TaskType taskType() { return taskType; } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java deleted file mode 100644 index 9a94ad6..0000000 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java +++ /dev/null @@ -1,190 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.streams.processor.internals; - -import org.apache.kafka.common.utils.LogContext; -import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.processor.Cancellable; -import org.apache.kafka.streams.processor.PunctuationType; -import org.apache.kafka.streams.processor.Punctuator; -import org.apache.kafka.streams.processor.StateStore; -import org.apache.kafka.streams.processor.TaskId; -import org.apache.kafka.streams.processor.To; -import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; -import org.apache.kafka.streams.state.internals.ThreadCache; - -import java.time.Duration; - -class StandbyContextImpl extends AbstractProcessorContext implements RecordCollector.Supplier { - - StandbyContextImpl(final TaskId id, - final StreamsConfig config, - final ProcessorStateManager stateMgr, - final StreamsMetricsImpl metrics) { - super( - id, - config, - metrics, - stateMgr, - new ThreadCache( - new LogContext(String.format("stream-thread [%s] ", Thread.currentThread().getName())), - 0, - metrics - ) - ); - } - - - StateManager getStateMgr() { - return stateManager; - } - - @Override - public RecordCollector recordCollector() { - // return null collector specifically since in standby task it should not be called; - // if ever then we would throw NPE, which should never happen - return null; - } - - /** - * @throws UnsupportedOperationException on every invocation - */ - @Override - public StateStore getStateStore(final String name) { - throw new UnsupportedOperationException("this should not happen: getStateStore() not supported in standby tasks."); - } - - /** - * @throws UnsupportedOperationException on every invocation - */ - @Override - public String topic() { - throw new UnsupportedOperationException("this should not happen: topic() not supported in standby tasks."); - } - - /** - * @throws UnsupportedOperationException on every invocation - */ - @Override - public int partition() { - throw new UnsupportedOperationException("this should not happen: partition() not supported in standby tasks."); - } - - /** - * @throws UnsupportedOperationException on every invocation - */ - @Override - public long offset() { - throw new UnsupportedOperationException("this should not happen: offset() not supported in standby tasks."); - } - - /** - * @throws UnsupportedOperationException on every invocation - */ - @Override - public long timestamp() { - throw new UnsupportedOperationException("this should not happen: timestamp() not supported in standby tasks."); - } - - /** - * @throws UnsupportedOperationException on every invocation - */ - @Override - public <K, V> void forward(final K key, final V value) { - throw new UnsupportedOperationException("this should not happen: forward() not supported in standby tasks."); - } - - /** - * @throws UnsupportedOperationException on every invocation - */ - @Override - public <K, V> void forward(final K key, final V value, final To to) { - throw new UnsupportedOperationException("this should not happen: forward() not supported in standby tasks."); - } - - /** - * @throws UnsupportedOperationException on every invocation - */ - @Override - @Deprecated - public <K, V> void forward(final K key, final V value, final int childIndex) { - throw new UnsupportedOperationException("this should not happen: forward() not supported in standby tasks."); - } - - /** - * @throws UnsupportedOperationException on every invocation - */ - @Override - @Deprecated - public <K, V> void forward(final K key, final V value, final String childName) { - throw new UnsupportedOperationException("this should not happen: forward() not supported in standby tasks."); - } - - /** - * @throws UnsupportedOperationException on every invocation - */ - @Override - public void commit() { - throw new UnsupportedOperationException("this should not happen: commit() not supported in standby tasks."); - } - - /** - * @throws UnsupportedOperationException on every invocation - */ - @Override - @Deprecated - public Cancellable schedule(final long interval, final PunctuationType type, final Punctuator callback) { - throw new UnsupportedOperationException("this should not happen: schedule() not supported in standby tasks."); - } - - /** - * @throws UnsupportedOperationException on every invocation - */ - @Override - public Cancellable schedule(final Duration interval, final PunctuationType type, final Punctuator callback) throws IllegalArgumentException { - throw new UnsupportedOperationException("this should not happen: schedule() not supported in standby tasks."); - } - - /** - * @throws UnsupportedOperationException on every invocation - */ - @Override - public ProcessorRecordContext recordContext() { - throw new UnsupportedOperationException("this should not happen: recordContext not supported in standby tasks."); - } - - /** - * @throws UnsupportedOperationException on every invocation - */ - @Override - public void setRecordContext(final ProcessorRecordContext recordContext) { - throw new UnsupportedOperationException("this should not happen: setRecordContext not supported in standby tasks."); - } - - @Override - public void setCurrentNode(final ProcessorNode currentNode) { - // no-op. can't throw as this is called on commit when the StateStores get flushed. - } - - /** - * @throws UnsupportedOperationException on every invocation - */ - @Override - public ProcessorNode currentNode() { - throw new UnsupportedOperationException("this should not happen: currentNode not supported in standby tasks."); - } -} diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java index 83e0d73..b4abd79 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java @@ -70,7 +70,7 @@ public class StandbyTask extends AbstractTask implements Task { final LogContext logContext = new LogContext(logPrefix); log = logContext.logger(getClass()); - processorContext = new StandbyContextImpl(id, config, stateMgr, metrics); + processorContext = new ProcessorContextImpl(id, config, stateMgr, metrics); closeTaskSensor = ThreadMetrics.closeTaskSensor(Thread.currentThread().getName(), metrics); eosEnabled = StreamThread.eosEnabled(config); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManager.java index 0cb49754..674ea18 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManager.java @@ -24,7 +24,7 @@ import org.apache.kafka.streams.processor.StateStore; import java.io.File; import java.io.IOException; import java.util.Map; - +import org.apache.kafka.streams.processor.internals.Task.TaskType; interface StateManager { File baseDir(); @@ -46,6 +46,8 @@ interface StateManager { void close() throws IOException; + TaskType taskType(); + // TODO: we can remove this when consolidating global state manager into processor state manager StateStore getGlobalStore(final String name); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java index 14f4e54..8026b04 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java @@ -69,7 +69,7 @@ public class CachingKeyValueStore private void initInternal(final ProcessorContext context) { this.context = (InternalProcessorContext) context; - this.cache = this.context.getCache(); + this.cache = this.context.cache(); this.cacheName = ThreadCache.nameSpaceFromTaskIdAndStore(context.taskId().toString(), name()); cache.addDirtyEntryFlushListener(cacheName, entries -> { for (final ThreadCache.DirtyEntry entry : entries) { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java index f537d4c..4976ef1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java @@ -72,7 +72,7 @@ class CachingSessionStore this.context = context; cacheName = context.taskId() + "-" + name(); - cache = context.getCache(); + cache = context.cache(); cache.addDirtyEntryFlushListener(cacheName, entries -> { for (final ThreadCache.DirtyEntry entry : entries) { putAndMaybeForward(entry, context); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java index d2bd02e..e71f87e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java @@ -84,7 +84,7 @@ class CachingWindowStore Serdes.Bytes(), Serdes.ByteArray()); name = context.taskId() + "-" + name(); - cache = this.context.getCache(); + cache = this.context.cache(); cache.addDirtyEntryFlushListener(name, entries -> { for (final ThreadCache.DirtyEntry entry : entries) { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java index a924af6..35f6d36 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java @@ -16,15 +16,13 @@ */ package org.apache.kafka.streams.state.internals; -import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; -import org.apache.kafka.streams.processor.internals.ProcessorStateManager; +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; -import org.apache.kafka.streams.state.StateSerdes; import java.util.List; @@ -32,7 +30,7 @@ public class ChangeLoggingKeyValueBytesStore extends WrappedStateStore<KeyValueStore<Bytes, byte[]>, byte[], byte[]> implements KeyValueStore<Bytes, byte[]> { - StoreChangeLogger<Bytes, byte[]> changeLogger; + InternalProcessorContext context; ChangeLoggingKeyValueBytesStore(final KeyValueStore<Bytes, byte[]> inner) { super(inner); @@ -42,11 +40,7 @@ public class ChangeLoggingKeyValueBytesStore public void init(final ProcessorContext context, final StateStore root) { super.init(context, root); - final String topic = ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()); - changeLogger = new StoreChangeLogger<>( - name(), - context, - new StateSerdes<>(topic, Serdes.Bytes(), Serdes.ByteArray())); + this.context = (InternalProcessorContext) context; // if the inner store is an LRU cache, add the eviction listener to log removed record if (wrapped() instanceof MemoryLRUCache) { @@ -113,6 +107,6 @@ public class ChangeLoggingKeyValueBytesStore void log(final Bytes key, final byte[] value) { - changeLogger.logChange(key, value); + context.logChange(name(), key, value, context.timestamp()); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java index 361f8a5..cc586d3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java @@ -16,15 +16,13 @@ */ package org.apache.kafka.streams.state.internals; -import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.kstream.Windowed; -import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.SessionStore; -import org.apache.kafka.streams.state.StateSerdes; /** * Simple wrapper around a {@link SessionStore} to support writing @@ -34,7 +32,7 @@ class ChangeLoggingSessionBytesStore extends WrappedStateStore<SessionStore<Bytes, byte[]>, byte[], byte[]> implements SessionStore<Bytes, byte[]> { - private StoreChangeLogger<Bytes, byte[]> changeLogger; + private InternalProcessorContext context; ChangeLoggingSessionBytesStore(final SessionStore<Bytes, byte[]> bytesStore) { super(bytesStore); @@ -43,16 +41,9 @@ class ChangeLoggingSessionBytesStore @Override public void init(final ProcessorContext context, final StateStore root) { super.init(context, root); - final String topic = ProcessorStateManager.storeChangelogTopic( - context.applicationId(), - name()); - changeLogger = new StoreChangeLogger<>( - name(), - context, - new StateSerdes<>(topic, Serdes.Bytes(), Serdes.ByteArray())); + this.context = (InternalProcessorContext) context; } - @Override public KeyValueIterator<Windowed<Bytes>, byte[]> findSessions(final Bytes key, final long earliestSessionEndTime, final long latestSessionStartTime) { return wrapped().findSessions(key, earliestSessionEndTime, latestSessionStartTime); @@ -66,14 +57,13 @@ class ChangeLoggingSessionBytesStore @Override public void remove(final Windowed<Bytes> sessionKey) { wrapped().remove(sessionKey); - changeLogger.logChange(SessionKeySchema.toBinary(sessionKey), null); + context.logChange(name(), SessionKeySchema.toBinary(sessionKey), null, context.timestamp()); } @Override public void put(final Windowed<Bytes> sessionKey, final byte[] aggregate) { wrapped().put(sessionKey, aggregate); - changeLogger.logChange(SessionKeySchema.toBinary(sessionKey), aggregate); - + context.logChange(name(), SessionKeySchema.toBinary(sessionKey), aggregate, context.timestamp()); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStore.java index 02e4c6a..7cdac97 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStore.java @@ -32,9 +32,9 @@ public class ChangeLoggingTimestampedKeyValueBytesStore extends ChangeLoggingKey void log(final Bytes key, final byte[] valueAndTimestamp) { if (valueAndTimestamp != null) { - changeLogger.logChange(key, rawValue(valueAndTimestamp), timestamp(valueAndTimestamp)); + context.logChange(name(), key, rawValue(valueAndTimestamp), timestamp(valueAndTimestamp)); } else { - changeLogger.logChange(key, null); + context.logChange(name(), key, null, context.timestamp()); } } } \ No newline at end of file diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStore.java index 94362d4..3714150 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStore.java @@ -33,9 +33,9 @@ class ChangeLoggingTimestampedWindowBytesStore extends ChangeLoggingWindowBytesS void log(final Bytes key, final byte[] valueAndTimestamp) { if (valueAndTimestamp != null) { - changeLogger.logChange(key, rawValue(valueAndTimestamp), timestamp(valueAndTimestamp)); + context.logChange(name(), key, rawValue(valueAndTimestamp), timestamp(valueAndTimestamp)); } else { - changeLogger.logChange(key, null); + context.logChange(name(), key, null, context.timestamp()); } } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java index 8a9b91a..a04eb2e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java @@ -16,14 +16,12 @@ */ package org.apache.kafka.streams.state.internals; -import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; -import org.apache.kafka.streams.processor.internals.ProcessorStateManager; +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; import org.apache.kafka.streams.state.KeyValueIterator; -import org.apache.kafka.streams.state.StateSerdes; import org.apache.kafka.streams.state.WindowStore; import org.apache.kafka.streams.state.WindowStoreIterator; @@ -36,11 +34,9 @@ class ChangeLoggingWindowBytesStore implements WindowStore<Bytes, byte[]> { private final boolean retainDuplicates; - private ProcessorContext context; + InternalProcessorContext context; private int seqnum = 0; - StoreChangeLogger<Bytes, byte[]> changeLogger; - ChangeLoggingWindowBytesStore(final WindowStore<Bytes, byte[]> bytesStore, final boolean retainDuplicates) { super(bytesStore); @@ -50,13 +46,8 @@ class ChangeLoggingWindowBytesStore @Override public void init(final ProcessorContext context, final StateStore root) { - this.context = context; + this.context = (InternalProcessorContext) context; super.init(context, root); - final String topic = ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()); - changeLogger = new StoreChangeLogger<>( - name(), - context, - new StateSerdes<>(topic, Serdes.Bytes(), Serdes.ByteArray())); } @Override @@ -114,7 +105,7 @@ class ChangeLoggingWindowBytesStore void log(final Bytes key, final byte[] value) { - changeLogger.logChange(key, value); + context.logChange(name(), key, value, context.timestamp()); } private int maybeUpdateSeqnumForDups() { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java deleted file mode 100644 index 7358120..0000000 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.streams.state.internals; - -import org.apache.kafka.common.serialization.Serializer; -import org.apache.kafka.streams.processor.ProcessorContext; -import org.apache.kafka.streams.processor.internals.ProcessorStateManager; -import org.apache.kafka.streams.processor.internals.RecordCollector; -import org.apache.kafka.streams.state.StateSerdes; - -/** - * Note that the use of array-typed keys is discouraged because they result in incorrect caching behavior. - * If you intend to work on byte arrays as key, for example, you may want to wrap them with the {@code Bytes} class, - * i.e. use {@code StoreChangeLogger<Bytes, ...>} rather than {@code StoreChangeLogger<byte[], ...>}. - * - * @param <K> - * @param <V> - */ -class StoreChangeLogger<K, V> { - - private final String topic; - private final int partition; - private final ProcessorContext context; - private final RecordCollector collector; - private final Serializer<K> keySerializer; - private final Serializer<V> valueSerializer; - - StoreChangeLogger(final String storeName, - final ProcessorContext context, - final StateSerdes<K, V> serialization) { - this(storeName, context, context.taskId().partition, serialization); - } - - private StoreChangeLogger(final String storeName, - final ProcessorContext context, - final int partition, - final StateSerdes<K, V> serialization) { - topic = ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName); - this.context = context; - this.partition = partition; - this.collector = ((RecordCollector.Supplier) context).recordCollector(); - keySerializer = serialization.keySerializer(); - valueSerializer = serialization.valueSerializer(); - } - - void logChange(final K key, - final V value) { - logChange(key, value, context.timestamp()); - } - - void logChange(final K key, - final V value, - final long timestamp) { - // Sending null headers to changelog topics (KIP-244) - collector.send(topic, key, value, null, partition, timestamp, keySerializer, valueSerializer); - } -} diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java index 4a3ee7b..72b415f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java @@ -21,6 +21,7 @@ import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.header.internals.RecordHeader; import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.processor.Cancellable; @@ -180,7 +181,7 @@ public class AbstractProcessorContextTest { public void appConfigsShouldReturnUnrecognizedValues() { assertThat( context.appConfigs().get("user.supplied.config"), - equalTo("user-suppplied-value")); + equalTo("user-supplied-value")); } @@ -190,7 +191,7 @@ public class AbstractProcessorContextTest { config = getStreamsConfig(); // Value must be a string to test className -> class conversion config.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, RocksDBConfigSetter.class.getName()); - config.put("user.supplied.config", "user-suppplied-value"); + config.put("user.supplied.config", "user-supplied-value"); } TestProcessorContext(final MockStreamsMetrics metrics) { @@ -233,5 +234,12 @@ public class AbstractProcessorContextTest { @Override public void commit() {} + + @Override + public void logChange(final String storeName, + final Bytes key, + final byte[] value, + final long timestamp) { + } } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java index 8443e55..e4fe6ed 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java @@ -20,6 +20,7 @@ import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.To; +import org.apache.kafka.streams.processor.internals.Task.TaskType; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.SessionStore; import org.apache.kafka.streams.state.TimestampedKeyValueStore; @@ -72,6 +73,7 @@ public class GlobalProcessorContextImplTest { expect(stateManager.getGlobalStore(GLOBAL_TIMESTAMPED_WINDOW_STORE_NAME)).andReturn(mock(TimestampedWindowStore.class)); expect(stateManager.getGlobalStore(GLOBAL_SESSION_STORE_NAME)).andReturn(mock(SessionStore.class)); expect(stateManager.getGlobalStore(UNKNOWN_STORE)).andReturn(null); + expect(stateManager.taskType()).andStubReturn(TaskType.GLOBAL); replay(stateManager); globalContext = new GlobalProcessorContextImpl( diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java index 5b52e9d..41cfdfa 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java @@ -16,13 +16,18 @@ */ package org.apache.kafka.streams.processor.internals; +import java.time.Duration; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.PunctuationType; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.To; +import org.apache.kafka.streams.processor.internals.Task.TaskType; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; @@ -33,6 +38,7 @@ import org.apache.kafka.streams.state.ValueAndTimestamp; import org.apache.kafka.streams.state.WindowStore; import org.apache.kafka.streams.state.WindowStoreIterator; import org.apache.kafka.streams.state.internals.ThreadCache; +import org.easymock.EasyMock; import org.junit.Before; import org.junit.Test; @@ -43,6 +49,8 @@ import java.util.List; import java.util.function.Consumer; import static java.util.Arrays.asList; +import static org.apache.kafka.streams.processor.internals.ProcessorContextImpl.BYTES_KEY_SERIALIZER; +import static org.apache.kafka.streams.processor.internals.ProcessorContextImpl.BYTEARRAY_VALUE_SERIALIZER; import static org.easymock.EasyMock.anyLong; import static org.easymock.EasyMock.anyObject; import static org.easymock.EasyMock.anyString; @@ -50,13 +58,19 @@ import static org.easymock.EasyMock.expect; import static org.easymock.EasyMock.expectLastCall; import static org.easymock.EasyMock.mock; import static org.easymock.EasyMock.replay; +import static org.easymock.EasyMock.verify; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; public class ProcessorContextImplTest { private ProcessorContextImpl context; + private final StreamsConfig streamsConfig = streamsConfigMock(); + + private RecordCollector recordCollector = mock(RecordCollector.class); + private static final String KEY = "key"; private static final long VALUE = 42L; private static final ValueAndTimestamp<Long> VALUE_AND_TIMESTAMP = ValueAndTimestamp.make(42L, 21L); @@ -99,13 +113,8 @@ public class ProcessorContextImplTest { timestampedIters.add(i, mock(KeyValueIterator.class)); } - final StreamsConfig streamsConfig = mock(StreamsConfig.class); - expect(streamsConfig.getString(StreamsConfig.APPLICATION_ID_CONFIG)).andReturn("add-id"); - expect(streamsConfig.defaultValueSerde()).andReturn(Serdes.ByteArray()); - expect(streamsConfig.defaultKeySerde()).andReturn(Serdes.ByteArray()); - replay(streamsConfig); - final ProcessorStateManager stateManager = mock(ProcessorStateManager.class); + expect(stateManager.taskType()).andStubReturn(TaskType.ACTIVE); expect(stateManager.getGlobalStore("GlobalKeyValueStore")).andReturn(keyValueStoreMock()); expect(stateManager.getGlobalStore("GlobalTimestampedKeyValueStore")).andReturn(timestampedKeyValueStoreMock()); @@ -125,7 +134,7 @@ public class ProcessorContextImplTest { mock(TaskId.class), mock(StreamTask.class), streamsConfig, - mock(RecordCollector.class), + recordCollector, stateManager, mock(StreamsMetricsImpl.class), mock(ThreadCache.class) @@ -140,6 +149,18 @@ public class ProcessorContextImplTest { "LocalSessionStore")))); } + private ProcessorContextImpl getStandbyContext() { + final ProcessorStateManager stateManager = EasyMock.createNiceMock(ProcessorStateManager.class); + expect(stateManager.taskType()).andStubReturn(TaskType.STANDBY); + replay(stateManager); + return new ProcessorContextImpl( + mock(TaskId.class), + streamsConfig, + stateManager, + mock(StreamsMetricsImpl.class) + ); + } + @Test public void globalKeyValueStoreShouldBeReadOnly() { doTest("GlobalKeyValueStore", (Consumer<KeyValueStore<String, Long>>) store -> { @@ -347,6 +368,165 @@ public class ProcessorContextImplTest { }); } + @Test + public void shouldNotSendRecordHeadersToChangelogTopic() { + final Bytes key = Bytes.wrap("key".getBytes()); + final byte[] value = "zero".getBytes(); + + recordCollector.send(null, key, value, null, 0, 42L, BYTES_KEY_SERIALIZER, BYTEARRAY_VALUE_SERIALIZER); + + replay(recordCollector); + context.logChange("Store", key, value, 42L); + + verify(recordCollector); + } + + @Test + public void shouldThrowUnsupportedOperationExceptionOnLogChange() { + context = getStandbyContext(); + assertThrows( + UnsupportedOperationException.class, + () -> context.logChange("Store", Bytes.wrap("k".getBytes()), null, 0L) + ); + } + + @Test + public void shouldThrowUnsupportedOperationExceptionOnGetStateStore() { + context = getStandbyContext(); + assertThrows( + UnsupportedOperationException.class, + () -> context.getStateStore("store") + ); + } + + @Test + public void shouldThrowUnsupportedOperationExceptionOnForward() { + context = getStandbyContext(); + assertThrows( + UnsupportedOperationException.class, + () -> context.forward("key", "value") + ); + } + + @SuppressWarnings("deprecation") + @Test + public void shouldThrowUnsupportedOperationExceptionOnForwardWithChildIndex() { + context = getStandbyContext(); + assertThrows( + UnsupportedOperationException.class, + () -> context.forward("key", "value", 0) + ); + } + + @SuppressWarnings("deprecation") + @Test + public void shouldThrowUnsupportedOperationExceptionOnForwardWithChildName() { + context = getStandbyContext(); + assertThrows( + UnsupportedOperationException.class, + () -> context.forward("key", "value", "child-name") + ); + } + + @Test + public void shouldThrowUnsupportedOperationExceptionOnForwardWithTo() { + context = getStandbyContext(); + assertThrows( + UnsupportedOperationException.class, + () -> context.forward("key", "value", To.child("child-name")) + ); + } + + @Test + public void shouldThrowUnsupportedOperationExceptionOnCommit() { + context = getStandbyContext(); + assertThrows( + UnsupportedOperationException.class, + () -> context.commit() + ); + } + + @SuppressWarnings("deprecation") + @Test + public void shouldThrowUnsupportedOperationExceptionOnScheduleWithInterval() { + context = getStandbyContext(); + assertThrows( + UnsupportedOperationException.class, + () -> context.schedule(100L, PunctuationType.STREAM_TIME, t -> { }) + ); + } + + @Test + public void shouldThrowUnsupportedOperationExceptionOnSchedule() { + context = getStandbyContext(); + assertThrows( + UnsupportedOperationException.class, + () -> context.schedule(Duration.ofMillis(100L), PunctuationType.STREAM_TIME, t -> { }) + ); + } + + @Test + public void shouldThrowUnsupportedOperationExceptionOnTopic() { + context = getStandbyContext(); + assertThrows( + UnsupportedOperationException.class, + () -> context.topic() + ); + } + @Test + public void shouldThrowUnsupportedOperationExceptionOnPartition() { + context = getStandbyContext(); + assertThrows( + UnsupportedOperationException.class, + () -> context.partition() + ); + } + + @Test + public void shouldThrowUnsupportedOperationExceptionOnOffset() { + context = getStandbyContext(); + assertThrows( + UnsupportedOperationException.class, + () -> context.offset() + ); + } + + @Test + public void shouldThrowUnsupportedOperationExceptionOnTimestamp() { + context = getStandbyContext(); + assertThrows( + UnsupportedOperationException.class, + () -> context.timestamp() + ); + } + + @Test + public void shouldThrowUnsupportedOperationExceptionOnCurrentNode() { + context = getStandbyContext(); + assertThrows( + UnsupportedOperationException.class, + () -> context.currentNode() + ); + } + + @Test + public void shouldThrowUnsupportedOperationExceptionOnSetRecordContext() { + context = getStandbyContext(); + assertThrows( + UnsupportedOperationException.class, + () -> context.setRecordContext(mock(ProcessorRecordContext.class)) + ); + } + + @Test + public void shouldThrowUnsupportedOperationExceptionOnRecordContext() { + context = getStandbyContext(); + assertThrows( + UnsupportedOperationException.class, + () -> context.recordContext() + ); + } + @SuppressWarnings("unchecked") private KeyValueStore<String, Long> keyValueStoreMock() { final KeyValueStore<String, Long> keyValueStoreMock = mock(KeyValueStore.class); @@ -511,6 +691,15 @@ public class ProcessorContextImplTest { return sessionStore; } + private StreamsConfig streamsConfigMock() { + final StreamsConfig streamsConfig = mock(StreamsConfig.class); + expect(streamsConfig.getString(StreamsConfig.APPLICATION_ID_CONFIG)).andStubReturn("add-id"); + expect(streamsConfig.defaultValueSerde()).andStubReturn(Serdes.ByteArray()); + expect(streamsConfig.defaultKeySerde()).andStubReturn(Serdes.ByteArray()); + replay(streamsConfig); + return streamsConfig; + } + private void initStateStoreMock(final StateStore stateStore) { expect(stateStore.name()).andReturn(STORE_NAME); expect(stateStore.persistent()).andReturn(true); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextTest.java index 45e0165..44f01b0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextTest.java @@ -20,6 +20,7 @@ import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.internals.Task.TaskType; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.state.internals.ThreadCache; import org.junit.Before; @@ -43,14 +44,18 @@ public class ProcessorContextTest { expect(streamsConfig.getString(StreamsConfig.APPLICATION_ID_CONFIG)).andReturn("add-id"); expect(streamsConfig.defaultValueSerde()).andReturn(Serdes.ByteArray()); expect(streamsConfig.defaultKeySerde()).andReturn(Serdes.ByteArray()); - replay(streamsConfig); + + final ProcessorStateManager stateManager = mock(ProcessorStateManager.class); + expect(stateManager.taskType()).andStubReturn(TaskType.ACTIVE); + + replay(streamsConfig, stateManager); context = new ProcessorContextImpl( mock(TaskId.class), mock(StreamTask.class), streamsConfig, mock(RecordCollector.class), - mock(ProcessorStateManager.class), + stateManager, mock(StreamsMetricsImpl.class), mock(ThreadCache.class) ); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java index c993d1d..7c14f23 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java @@ -31,6 +31,7 @@ import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.LockException; import org.apache.kafka.streams.errors.ProcessorStateException; import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.internals.Task.TaskType; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.test.MockKeyValueStore; import org.apache.kafka.test.MockKeyValueStoreBuilder; @@ -144,7 +145,9 @@ public class StandbyTaskTest { public void shouldThrowLockExceptionIfFailedToLockStateDirectory() throws IOException { stateDirectory = EasyMock.createNiceMock(StateDirectory.class); EasyMock.expect(stateDirectory.lock(taskId)).andReturn(false); - EasyMock.replay(stateDirectory); + EasyMock.expect(stateManager.taskType()).andStubReturn(TaskType.STANDBY); + + EasyMock.replay(stateDirectory, stateManager); task = createStandbyTask(); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerStub.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerStub.java index bf9abee..cf1eafd 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerStub.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerStub.java @@ -23,6 +23,7 @@ import org.apache.kafka.streams.processor.StateStore; import java.io.File; import java.util.Map; +import org.apache.kafka.streams.processor.internals.Task.TaskType; public class StateManagerStub implements StateManager { @@ -59,4 +60,9 @@ public class StateManagerStub implements StateManager { @Override public void checkpoint(final Map<TopicPartition, Long> offsets) {} + @Override + public TaskType taskType() { + return null; + } + } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java index 89740c3..7a39121 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java @@ -249,7 +249,7 @@ public class KeyValueStoreTestDriver<K, V> { final ThreadCache cache = new ThreadCache(new LogContext("testCache "), 1024 * 1024L, metrics()); @Override - public ThreadCache getCache() { + public ThreadCache cache() { return cache; } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java index 79d66bf..5ab035c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java @@ -31,12 +31,6 @@ import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; - -import static org.hamcrest.CoreMatchers.equalTo; -import static org.hamcrest.CoreMatchers.nullValue; -import static org.hamcrest.MatcherAssert.assertThat; - - @RunWith(EasyMockRunner.class) public class ChangeLoggingSessionBytesStoreTest { @@ -75,13 +69,15 @@ public class ChangeLoggingSessionBytesStoreTest { init(); - store.put(key1, value1); + final Bytes binaryKey = SessionKeySchema.toBinary(key1); - assertThat(collector.collected().size(), equalTo(1)); - assertThat(collector.collected().get(0).key(), equalTo(SessionKeySchema.toBinary(key1))); - assertThat(collector.collected().get(0).value(), equalTo(value1)); + EasyMock.reset(context); + context.logChange(store.name(), binaryKey, value1, 0L); - EasyMock.verify(inner); + EasyMock.replay(context); + store.put(key1, value1); + + EasyMock.verify(inner, context); } @Test @@ -93,11 +89,14 @@ public class ChangeLoggingSessionBytesStoreTest { store.remove(key1); final Bytes binaryKey = SessionKeySchema.toBinary(key1); - assertThat(collector.collected().size(), equalTo(1)); - assertThat(collector.collected().get(0).key(), equalTo(binaryKey)); - assertThat(collector.collected().get(0).value(), nullValue()); - EasyMock.verify(inner); + EasyMock.reset(context); + context.logChange(store.name(), binaryKey, null, 0L); + + EasyMock.replay(context); + store.remove(key1); + + EasyMock.verify(inner, context); } @Test diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreTest.java index bde6d05..4a240b1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreTest.java @@ -31,9 +31,6 @@ import org.junit.Test; import org.junit.runner.RunWith; import static java.time.Instant.ofEpochMilli; -import static org.hamcrest.CoreMatchers.equalTo; -import static org.hamcrest.MatcherAssert.assertThat; - @RunWith(EasyMockRunner.class) public class ChangeLoggingTimestampedWindowBytesStoreTest { @@ -75,15 +72,15 @@ public class ChangeLoggingTimestampedWindowBytesStoreTest { init(); - store.put(bytesKey, valueAndTimestamp); - final Bytes key = WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 0); - assertThat(collector.collected().size(), equalTo(1)); - assertThat(collector.collected().get(0).key(), equalTo(key)); - assertThat(collector.collected().get(0).value(), equalTo(value)); - assertThat(collector.collected().get(0).timestamp(), equalTo(42L)); - EasyMock.verify(inner); + EasyMock.reset(context); + context.logChange(store.name(), key, value, 42); + + EasyMock.replay(context); + store.put(bytesKey, valueAndTimestamp); + + EasyMock.verify(inner, context); } @Test @@ -118,20 +115,20 @@ public class ChangeLoggingTimestampedWindowBytesStoreTest { EasyMock.expectLastCall().times(2); init(); - store.put(bytesKey, valueAndTimestamp); - store.put(bytesKey, valueAndTimestamp); final Bytes key1 = WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 1); final Bytes key2 = WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 2); - assertThat(collector.collected().size(), equalTo(2)); - assertThat(collector.collected().get(0).key(), equalTo(key1)); - assertThat(collector.collected().get(0).value(), equalTo(value)); - assertThat(collector.collected().get(0).timestamp(), equalTo(42L)); - assertThat(collector.collected().get(1).key(), equalTo(key2)); - assertThat(collector.collected().get(1).value(), equalTo(value)); - assertThat(collector.collected().get(1).timestamp(), equalTo(42L)); - EasyMock.verify(inner); + EasyMock.reset(context); + context.logChange(store.name(), key1, value, 42L); + context.logChange(store.name(), key2, value, 42L); + + EasyMock.replay(context); + + store.put(bytesKey, valueAndTimestamp); + store.put(bytesKey, valueAndTimestamp); + + EasyMock.verify(inner, context); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java index ce60548..f4cb523 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java @@ -31,9 +31,6 @@ import org.junit.Test; import org.junit.runner.RunWith; import static java.time.Instant.ofEpochMilli; -import static org.hamcrest.CoreMatchers.equalTo; -import static org.hamcrest.MatcherAssert.assertThat; - @RunWith(EasyMockRunner.class) public class ChangeLoggingWindowBytesStoreTest { @@ -74,15 +71,16 @@ public class ChangeLoggingWindowBytesStoreTest { init(); - store.put(bytesKey, value); - final Bytes key = WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 0); - assertThat(collector.collected().size(), equalTo(1)); - assertThat(collector.collected().get(0).key(), equalTo(key)); - assertThat(collector.collected().get(0).value(), equalTo(value)); - assertThat(collector.collected().get(0).timestamp(), equalTo(0L)); - EasyMock.verify(inner); + EasyMock.reset(context); + EasyMock.expect(context.timestamp()).andStubReturn(0L); + context.logChange(store.name(), key, value, 0L); + + EasyMock.replay(context); + store.put(bytesKey, value); + + EasyMock.verify(inner, context); } @Test @@ -113,24 +111,26 @@ public class ChangeLoggingWindowBytesStoreTest { @SuppressWarnings("deprecation") public void shouldRetainDuplicatesWhenSet() { store = new ChangeLoggingWindowBytesStore(inner, true); + inner.put(bytesKey, value, 0); EasyMock.expectLastCall().times(2); init(); - store.put(bytesKey, value); - store.put(bytesKey, value); final Bytes key1 = WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 1); final Bytes key2 = WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 2); - assertThat(collector.collected().size(), equalTo(2)); - assertThat(collector.collected().get(0).key(), equalTo(key1)); - assertThat(collector.collected().get(0).value(), equalTo(value)); - assertThat(collector.collected().get(0).timestamp(), equalTo(0L)); - assertThat(collector.collected().get(1).key(), equalTo(key2)); - assertThat(collector.collected().get(1).value(), equalTo(value)); - assertThat(collector.collected().get(1).timestamp(), equalTo(0L)); - EasyMock.verify(inner); + EasyMock.reset(context); + EasyMock.expect(context.timestamp()).andStubReturn(0L); + context.logChange(store.name(), key1, value, 0L); + context.logChange(store.name(), key2, value, 0L); + + EasyMock.replay(context); + + store.put(bytesKey, value); + store.put(bytesKey, value); + + EasyMock.verify(inner, context); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java deleted file mode 100644 index c5a89da..0000000 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.streams.state.internals; - - -import org.apache.kafka.common.header.internals.RecordHeader; -import org.apache.kafka.common.record.Record; -import org.apache.kafka.streams.state.StateSerdes; -import org.apache.kafka.test.InternalMockProcessorContext; -import org.apache.kafka.test.MockRecordCollector; -import org.junit.Test; - -import static org.hamcrest.CoreMatchers.equalTo; -import static org.hamcrest.CoreMatchers.nullValue; -import static org.hamcrest.MatcherAssert.assertThat; - - -public class StoreChangeLoggerTest { - - private final String topic = "topic"; - - private final MockRecordCollector collector = new MockRecordCollector(); - private final InternalMockProcessorContext context = new InternalMockProcessorContext( - StateSerdes.withBuiltinTypes(topic, Integer.class, String.class), - collector); - - private final StoreChangeLogger<Integer, String> changeLogger = - new StoreChangeLogger<>(topic, context, StateSerdes.withBuiltinTypes(topic, Integer.class, String.class)); - - @Test - public void testAddRemove() { - context.setTime(1); - changeLogger.logChange(0, "zero"); - context.setTime(5); - changeLogger.logChange(1, "one"); - changeLogger.logChange(2, "two"); - changeLogger.logChange(3, "three", 42L); - context.setTime(9); - changeLogger.logChange(0, null); - - assertThat(collector.collected().size(), equalTo(5)); - assertThat(collector.collected().get(0).key(), equalTo(0)); - assertThat(collector.collected().get(0).value(), equalTo("zero")); - assertThat(collector.collected().get(0).timestamp(), equalTo(1L)); - assertThat(collector.collected().get(1).key(), equalTo(1)); - assertThat(collector.collected().get(1).value(), equalTo("one")); - assertThat(collector.collected().get(1).timestamp(), equalTo(5L)); - assertThat(collector.collected().get(2).key(), equalTo(2)); - assertThat(collector.collected().get(2).value(), equalTo("two")); - assertThat(collector.collected().get(2).timestamp(), equalTo(5L)); - assertThat(collector.collected().get(3).key(), equalTo(3)); - assertThat(collector.collected().get(3).value(), equalTo("three")); - assertThat(collector.collected().get(3).timestamp(), equalTo(42L)); - assertThat(collector.collected().get(4).key(), equalTo(0)); - assertThat(collector.collected().get(4).value(), nullValue()); - assertThat(collector.collected().get(4).timestamp(), equalTo(9L)); - } - - @Test - public void shouldNotSendRecordHeadersToChangelogTopic() { - context.headers().add(new RecordHeader("key", "value".getBytes())); - changeLogger.logChange(0, "zero", 42L); - - assertThat(collector.collected().size(), equalTo(1)); - assertThat(collector.collected().get(0).key(), equalTo(0)); - assertThat(collector.collected().get(0).value(), equalTo("zero")); - assertThat(collector.collected().get(0).timestamp(), equalTo(42L)); - assertThat(collector.collected().get(0).headers().toArray(), equalTo(Record.EMPTY_HEADERS)); - } -} diff --git a/streams/src/test/java/org/apache/kafka/test/GlobalStateManagerStub.java b/streams/src/test/java/org/apache/kafka/test/GlobalStateManagerStub.java index 3def253..ae825bc 100644 --- a/streams/src/test/java/org/apache/kafka/test/GlobalStateManagerStub.java +++ b/streams/src/test/java/org/apache/kafka/test/GlobalStateManagerStub.java @@ -25,6 +25,7 @@ import org.apache.kafka.streams.processor.internals.InternalProcessorContext; import java.io.File; import java.util.Map; import java.util.Set; +import org.apache.kafka.streams.processor.internals.Task.TaskType; public class GlobalStateManagerStub implements GlobalStateManager { @@ -82,4 +83,9 @@ public class GlobalStateManagerStub implements GlobalStateManager { public Map<TopicPartition, Long> changelogOffsets() { return offsets; } + + @Override + public TaskType taskType() { + return TaskType.GLOBAL; + } } diff --git a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java index 8cd784f..81876e4 100644 --- a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java +++ b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java @@ -21,6 +21,7 @@ import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.SystemTime; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsConfig; @@ -38,6 +39,7 @@ import org.apache.kafka.streams.processor.internals.ProcessorNode; import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback; import org.apache.kafka.streams.processor.internals.RecordCollector; +import org.apache.kafka.streams.processor.internals.Task.TaskType; import org.apache.kafka.streams.processor.internals.ToInternal; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.state.StateSerdes; @@ -349,6 +351,27 @@ public class InternalMockProcessorContext return recordContext.headers(); } + @Override + public TaskType taskType() { + return TaskType.ACTIVE; + } + + @Override + public void logChange(final String storeName, + final Bytes key, + final byte[] value, + final long timestamp) { + recordCollector().send( + storeName + "-changelog", + key, + value, + null, + taskId().partition, + timestamp, + BYTES_KEY_SERIALIZER, + BYTEARRAY_VALUE_SERIALIZER); + } + public StateRestoreListener getRestoreListener(final String storeName) { return getStateRestoreListener(restoreFuncs.get(storeName)); } diff --git a/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java index 4b7cf49..e375085 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java +++ b/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.test; +import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.processor.MockProcessorContext; import org.apache.kafka.streams.processor.StateRestoreCallback; import org.apache.kafka.streams.processor.StateStore; @@ -24,6 +25,7 @@ import org.apache.kafka.streams.processor.internals.InternalProcessorContext; import org.apache.kafka.streams.processor.internals.ProcessorNode; import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; import org.apache.kafka.streams.processor.internals.RecordCollector; +import org.apache.kafka.streams.processor.internals.Task.TaskType; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.state.internals.ThreadCache; @@ -38,6 +40,7 @@ public class MockInternalProcessorContext extends MockProcessorContext implement private ProcessorNode currentNode; private RecordCollector recordCollector; private long currentSystemTimeMs; + private TaskType taskType = TaskType.ACTIVE; public MockInternalProcessorContext() { } @@ -88,7 +91,7 @@ public class MockInternalProcessorContext extends MockProcessorContext implement } @Override - public ThreadCache getCache() { + public ThreadCache cache() { return null; } @@ -116,4 +119,16 @@ public class MockInternalProcessorContext extends MockProcessorContext implement public StateRestoreCallback stateRestoreCallback(final String storeName) { return restoreCallbacks.get(storeName); } + + @Override + public TaskType taskType() { + return taskType; + } + + @Override + public void logChange(final String storeName, + final Bytes key, + final byte[] value, + final long timestamp) { + } } \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java index 77dd418..da8b7b4 100644 --- a/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java +++ b/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java @@ -17,6 +17,7 @@ package org.apache.kafka.test; import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.processor.Cancellable; import org.apache.kafka.streams.processor.PunctuationType; @@ -32,6 +33,7 @@ import java.time.Duration; import java.util.HashMap; import java.util.Map; import java.util.Properties; +import org.apache.kafka.streams.processor.internals.Task.TaskType; public class NoOpProcessorContext extends AbstractProcessorContext { public boolean initialized; @@ -101,5 +103,18 @@ public class NoOpProcessorContext extends AbstractProcessorContext { @Override public void register(final StateStore store, - final StateRestoreCallback stateRestoreCallback) {} + final StateRestoreCallback stateRestoreCallback) { + } + + @Override + public TaskType taskType() { + return TaskType.ACTIVE; + } + + @Override + public void logChange(final String storeName, + final Bytes key, + final byte[] value, + final long timestamp) { + } } diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java index e78f966..8475172 100644 --- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java +++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java @@ -991,7 +991,7 @@ public class TopologyTestDriver implements Closeable { private StateStore getStateStore(final String name, final boolean throwForBuiltInStores) { if (task != null) { - final StateStore stateStore = ((ProcessorContextImpl) task.context()).getStateMgr().getStore(name); + final StateStore stateStore = ((ProcessorContextImpl) task.context()).stateManager().getStore(name); if (stateStore != null) { if (throwForBuiltInStores) { throwIfBuiltInStore(stateStore); diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java index 73da6ef..b16eb32 100644 --- a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java +++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java @@ -75,6 +75,7 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S private final List<CapturedForward> capturedForwards = new LinkedList<>(); private boolean committed = false; + /** * {@link CapturedPunctuator} holds captured punctuators, along with their scheduling information. */