[ https://issues.apache.org/jira/browse/KAFKA-6970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16716687#comment-16716687 ]
ASF GitHub Bot commented on KAFKA-6970: --------------------------------------- mjsax closed pull request #6016: KAFKA-6970: All standard state stores guarded with read only wrapper URL: https://github.com/apache/kafka/pull/6016 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java index ff3ef44894b..99ba0f6ce06 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java @@ -50,9 +50,18 @@ private CachedStateStore cachedStateStore(final StateStore store) { if (store instanceof CachedStateStore) { return (CachedStateStore) store; - } else if (store instanceof WrappedStateStore - && ((WrappedStateStore) store).wrappedStore() instanceof CachedStateStore) { - return (CachedStateStore) ((WrappedStateStore) store).wrappedStore(); + } else if (store instanceof WrappedStateStore) { + StateStore wrapped = ((WrappedStateStore) store).wrappedStore(); + + while (wrapped instanceof WrappedStateStore && !(wrapped instanceof CachedStateStore)) { + wrapped = ((WrappedStateStore) wrapped).wrappedStore(); + } + + if (!(wrapped instanceof CachedStateStore)) { + return null; + } + + return (CachedStateStore) wrapped; } return null; } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java index c79ec35328a..e7dd4dbc42a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.processor.internals; +import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.internals.ApiUtils; @@ -37,6 +38,7 @@ import java.time.Duration; import java.util.List; +import org.apache.kafka.streams.state.internals.WrappedStateStore.AbstractStateStore; import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix; @@ -102,7 +104,16 @@ public StateStore getStateStore(final String name) { "please file a bug report at https://issues.apache.org/jira/projects/KAFKA."); } - return stateManager.getStore(name); + final StateStore store = stateManager.getStore(name); + if (store instanceof KeyValueStore) { + return new KeyValueStoreReadWriteDecorator((KeyValueStore) store); + } else if (store instanceof WindowStore) { + return new WindowStoreReadWriteDecorator((WindowStore) store); + } else if (store instanceof SessionStore) { + return new SessionStoreReadWriteDecorator((SessionStore) store); + } + + return store; } @SuppressWarnings("unchecked") @@ -196,23 +207,16 @@ public long streamTime() { return streamTimeSupplier.get(); } - private abstract static class StateStoreReadOnlyDecorator<T extends StateStore> implements StateStore { + private abstract static class StateStoreReadOnlyDecorator<T extends StateStore> extends AbstractStateStore { static final String ERROR_MESSAGE = "Global store is read only"; - final T underlying; - - StateStoreReadOnlyDecorator(final T underlying) { - this.underlying = underlying; - } - - @Override - public String name() { - return underlying.name(); + StateStoreReadOnlyDecorator(final T inner) { + super(inner); } - @Override - public void init(final ProcessorContext context, final StateStore root) { - underlying.init(context, root); + @SuppressWarnings("unchecked") + T getInner() { + return (T) wrappedStore(); } @Override @@ -221,44 +225,39 @@ public void flush() { } @Override - public void close() { - underlying.close(); - } - - @Override - public boolean persistent() { - return underlying.persistent(); + public void init(final ProcessorContext context, final StateStore root) { + throw new UnsupportedOperationException(ERROR_MESSAGE); } @Override - public boolean isOpen() { - return underlying.isOpen(); + public void close() { + throw new UnsupportedOperationException(ERROR_MESSAGE); } } private static class KeyValueStoreReadOnlyDecorator<K, V> extends StateStoreReadOnlyDecorator<KeyValueStore<K, V>> implements KeyValueStore<K, V> { - KeyValueStoreReadOnlyDecorator(final KeyValueStore<K, V> underlying) { - super(underlying); + KeyValueStoreReadOnlyDecorator(final KeyValueStore<K, V> inner) { + super(inner); } @Override public V get(final K key) { - return underlying.get(key); + return getInner().get(key); } @Override public KeyValueIterator<K, V> range(final K from, final K to) { - return underlying.range(from, to); + return getInner().range(from, to); } @Override public KeyValueIterator<K, V> all() { - return underlying.all(); + return getInner().all(); } @Override public long approximateNumEntries() { - return underlying.approximateNumEntries(); + return getInner().approximateNumEntries(); } @Override @@ -283,8 +282,8 @@ public V delete(final K key) { } private static class WindowStoreReadOnlyDecorator<K, V> extends StateStoreReadOnlyDecorator<WindowStore<K, V>> implements WindowStore<K, V> { - WindowStoreReadOnlyDecorator(final WindowStore<K, V> underlying) { - super(underlying); + WindowStoreReadOnlyDecorator(final WindowStore<K, V> inner) { + super(inner); } @Override @@ -299,46 +298,46 @@ public void put(final K key, final V value, final long windowStartTimestamp) { @Override public V fetch(final K key, final long time) { - return underlying.fetch(key, time); + return getInner().fetch(key, time); } @Deprecated @Override public WindowStoreIterator<V> fetch(final K key, final long timeFrom, final long timeTo) { - return underlying.fetch(key, timeFrom, timeTo); + return getInner().fetch(key, timeFrom, timeTo); } @Deprecated @Override public KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to, final long timeFrom, final long timeTo) { - return underlying.fetch(from, to, timeFrom, timeTo); + return getInner().fetch(from, to, timeFrom, timeTo); } @Override public KeyValueIterator<Windowed<K>, V> all() { - return underlying.all(); + return getInner().all(); } @Deprecated @Override public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom, final long timeTo) { - return underlying.fetchAll(timeFrom, timeTo); + return getInner().fetchAll(timeFrom, timeTo); } } private static class SessionStoreReadOnlyDecorator<K, AGG> extends StateStoreReadOnlyDecorator<SessionStore<K, AGG>> implements SessionStore<K, AGG> { - SessionStoreReadOnlyDecorator(final SessionStore<K, AGG> underlying) { - super(underlying); + SessionStoreReadOnlyDecorator(final SessionStore<K, AGG> inner) { + super(inner); } @Override public KeyValueIterator<Windowed<K>, AGG> findSessions(final K key, final long earliestSessionEndTime, final long latestSessionStartTime) { - return underlying.findSessions(key, earliestSessionEndTime, latestSessionStartTime); + return getInner().findSessions(key, earliestSessionEndTime, latestSessionStartTime); } @Override public KeyValueIterator<Windowed<K>, AGG> findSessions(final K keyFrom, final K keyTo, final long earliestSessionEndTime, final long latestSessionStartTime) { - return underlying.findSessions(keyFrom, keyTo, earliestSessionEndTime, latestSessionStartTime); + return getInner().findSessions(keyFrom, keyTo, earliestSessionEndTime, latestSessionStartTime); } @Override @@ -353,12 +352,161 @@ public void put(final Windowed<K> sessionKey, final AGG aggregate) { @Override public KeyValueIterator<Windowed<K>, AGG> fetch(final K key) { - return underlying.fetch(key); + return getInner().fetch(key); + } + + @Override + public KeyValueIterator<Windowed<K>, AGG> fetch(final K from, final K to) { + return getInner().fetch(from, to); + } + } + + private abstract static class StateStoreReadWriteDecorator<T extends StateStore> extends AbstractStateStore { + static final String ERROR_MESSAGE = "This method may only be called by Kafka Streams"; + + StateStoreReadWriteDecorator(final T inner) { + super(inner); + } + + @SuppressWarnings("unchecked") + T wrapped() { + return (T) super.wrappedStore(); + } + + @Override + public void init(final ProcessorContext context, final StateStore root) { + throw new UnsupportedOperationException(ERROR_MESSAGE); + } + + @Override + public void close() { + throw new UnsupportedOperationException(ERROR_MESSAGE); + } + } + + private static class KeyValueStoreReadWriteDecorator<K, V> extends StateStoreReadWriteDecorator<KeyValueStore<K, V>> implements KeyValueStore<K, V> { + KeyValueStoreReadWriteDecorator(final KeyValueStore<K, V> inner) { + super(inner); + } + + @Override + public V get(final K key) { + return wrapped().get(key); + } + + @Override + public KeyValueIterator<K, V> range(final K from, final K to) { + return wrapped().range(from, to); + } + + @Override + public KeyValueIterator<K, V> all() { + return wrapped().all(); + } + + @Override + public long approximateNumEntries() { + return wrapped().approximateNumEntries(); + } + + @Override + public void put(final K key, final V value) { + wrapped().put(key, value); + } + + @Override + public V putIfAbsent(final K key, final V value) { + return wrapped().putIfAbsent(key, value); + } + + @Override + public void putAll(final List<KeyValue<K, V>> entries) { + wrapped().putAll(entries); + } + + @Override + public V delete(final K key) { + return wrapped().delete(key); + } + } + + private static class WindowStoreReadWriteDecorator<K, V> extends StateStoreReadWriteDecorator<WindowStore<K, V>> implements WindowStore<K, V> { + WindowStoreReadWriteDecorator(final WindowStore<K, V> inner) { + super(inner); + } + + @Override + public void put(final K key, final V value) { + wrapped().put(key, value); + } + + @Override + public void put(final K key, final V value, final long windowStartTimestamp) { + wrapped().put(key, value, windowStartTimestamp); + } + + @Override + public V fetch(final K key, final long time) { + return wrapped().fetch(key, time); + } + + @Deprecated + @Override + public WindowStoreIterator<V> fetch(final K key, final long timeFrom, final long timeTo) { + return wrapped().fetch(key, timeFrom, timeTo); + } + + @Deprecated + @Override + public KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to, final long timeFrom, final long timeTo) { + return wrapped().fetch(from, to, timeFrom, timeTo); + } + + @Override + public KeyValueIterator<Windowed<K>, V> all() { + return wrapped().all(); + } + + @Deprecated + @Override + public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom, final long timeTo) { + return wrapped().fetchAll(timeFrom, timeTo); + } + } + + private static class SessionStoreReadWriteDecorator<K, AGG> extends StateStoreReadWriteDecorator<SessionStore<K, AGG>> implements SessionStore<K, AGG> { + SessionStoreReadWriteDecorator(final SessionStore<K, AGG> inner) { + super(inner); + } + + @Override + public KeyValueIterator<Windowed<K>, AGG> findSessions(final K key, final long earliestSessionEndTime, final long latestSessionStartTime) { + return wrapped().findSessions(key, earliestSessionEndTime, latestSessionStartTime); + } + + @Override + public KeyValueIterator<Windowed<K>, AGG> findSessions(final K keyFrom, final K keyTo, final long earliestSessionEndTime, final long latestSessionStartTime) { + return wrapped().findSessions(keyFrom, keyTo, earliestSessionEndTime, latestSessionStartTime); + } + + @Override + public void remove(final Windowed<K> sessionKey) { + wrapped().remove(sessionKey); + } + + @Override + public void put(final Windowed<K> sessionKey, final AGG aggregate) { + wrapped().put(sessionKey, aggregate); + } + + @Override + public KeyValueIterator<Windowed<K>, AGG> fetch(final K key) { + return wrapped().fetch(key); } @Override public KeyValueIterator<Windowed<K>, AGG> fetch(final K from, final K to) { - return underlying.fetch(from, to); + return wrapped().fetch(from, to); } } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java index 38f966edabd..570c465f77f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java @@ -41,7 +41,7 @@ abstract class AbstractStateStore implements WrappedStateStore { final StateStore innerState; - AbstractStateStore(final StateStore inner) { + protected AbstractStateStore(final StateStore inner) { this.innerState = inner; } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java index fa5f597aa89..f956e0ed5a4 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java @@ -18,6 +18,7 @@ import java.util.ArrayList; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.function.Consumer; @@ -38,8 +39,9 @@ import org.junit.Before; import org.junit.Test; -import static java.util.Collections.emptySet; +import static java.util.Arrays.asList; import static org.easymock.EasyMock.anyLong; +import static org.easymock.EasyMock.anyObject; import static org.easymock.EasyMock.anyString; import static org.easymock.EasyMock.expect; import static org.easymock.EasyMock.expectLastCall; @@ -56,8 +58,14 @@ private static final long VAL = 42L; private static final String STORE_NAME = "underlying-store"; - private boolean initExecuted; - private boolean closeExecuted; + private boolean flushExecuted; + private boolean putExecuted; + private boolean putIfAbsentExecuted; + private boolean putAllExecuted; + private boolean deleteExecuted; + private boolean removeExecuted; + private boolean put3argExecuted; + private KeyValueIterator<String, Long> rangeIter; private KeyValueIterator<String, Long> allIter; @@ -66,6 +74,14 @@ @Before public void setup() { + flushExecuted = false; + putExecuted = false; + putIfAbsentExecuted = false; + putAllExecuted = false; + deleteExecuted = false; + removeExecuted = false; + put3argExecuted = false; + rangeIter = mock(KeyValueIterator.class); allIter = mock(KeyValueIterator.class); windowStoreIter = mock(WindowStoreIterator.class); @@ -82,9 +98,14 @@ public void setup() { final ProcessorStateManager stateManager = mock(ProcessorStateManager.class); - expect(stateManager.getGlobalStore("KeyValueStore")).andReturn(keyValueStoreMock()); - expect(stateManager.getGlobalStore("WindowStore")).andReturn(windowStoreMock()); - expect(stateManager.getGlobalStore("SessionStore")).andReturn(sessionStoreMock()); + expect(stateManager.getGlobalStore("GlobalKeyValueStore")).andReturn(keyValueStoreMock()); + expect(stateManager.getGlobalStore("GlobalWindowStore")).andReturn(windowStoreMock()); + expect(stateManager.getGlobalStore("GlobalSessionStore")).andReturn(sessionStoreMock()); + expect(stateManager.getGlobalStore(anyString())).andReturn(null); + + expect(stateManager.getStore("LocalKeyValueStore")).andReturn(keyValueStoreMock()); + expect(stateManager.getStore("LocalWindowStore")).andReturn(windowStoreMock()); + expect(stateManager.getStore("LocalSessionStore")).andReturn(sessionStoreMock()); replay(stateManager); @@ -98,16 +119,20 @@ public void setup() { mock(ThreadCache.class) ); - context.setCurrentNode(new ProcessorNode<String, Long>("fake", null, emptySet())); + context.setCurrentNode(new ProcessorNode<String, Long>("fake", null, + new HashSet<>(asList("LocalKeyValueStore", "LocalWindowStore", "LocalSessionStore")))); } @Test - public void testKeyValueStore() { - doTest("KeyValueStore", (Consumer<KeyValueStore<String, Long>>) store -> { - checkThrowsUnsupportedOperation(() -> store.put("1", 1L), "put"); - checkThrowsUnsupportedOperation(() -> store.putIfAbsent("1", 1L), "putIfAbsent"); - checkThrowsUnsupportedOperation(() -> store.putAll(Collections.emptyList()), "putAll"); - checkThrowsUnsupportedOperation(() -> store.delete("1"), "delete"); + public void globalKeyValueStoreShouldBeReadOnly() { + doTest("GlobalKeyValueStore", (Consumer<KeyValueStore<String, Long>>) store -> { + verifyStoreCannotBeInitializedOrClosed(store); + + checkThrowsUnsupportedOperation(store::flush, "flush()"); + checkThrowsUnsupportedOperation(() -> store.put("1", 1L), "put()"); + checkThrowsUnsupportedOperation(() -> store.putIfAbsent("1", 1L), "putIfAbsent()"); + checkThrowsUnsupportedOperation(() -> store.putAll(Collections.emptyList()), "putAll()"); + checkThrowsUnsupportedOperation(() -> store.delete("1"), "delete()"); assertEquals((Long) VAL, store.get(KEY)); assertEquals(rangeIter, store.range("one", "two")); @@ -117,10 +142,13 @@ public void testKeyValueStore() { } @Test - public void testWindowStore() { - doTest("WindowStore", (Consumer<WindowStore<String, Long>>) store -> { - checkThrowsUnsupportedOperation(() -> store.put("1", 1L, 1L), "put"); - checkThrowsUnsupportedOperation(() -> store.put("1", 1L), "put"); + public void globalWindowStoreShouldBeReadOnly() { + doTest("GlobalWindowStore", (Consumer<WindowStore<String, Long>>) store -> { + verifyStoreCannotBeInitializedOrClosed(store); + + checkThrowsUnsupportedOperation(store::flush, "flush()"); + checkThrowsUnsupportedOperation(() -> store.put("1", 1L, 1L), "put()"); + checkThrowsUnsupportedOperation(() -> store.put("1", 1L), "put()"); assertEquals(iters.get(0), store.fetchAll(0L, 0L)); assertEquals(windowStoreIter, store.fetch(KEY, 0L, 1L)); @@ -131,10 +159,13 @@ public void testWindowStore() { } @Test - public void testSessionStore() { - doTest("SessionStore", (Consumer<SessionStore<String, Long>>) store -> { - checkThrowsUnsupportedOperation(() -> store.remove(null), "remove"); - checkThrowsUnsupportedOperation(() -> store.put(null, null), "put"); + public void globalSessionStoreShouldBeReadOnly() { + doTest("GlobalSessionStore", (Consumer<SessionStore<String, Long>>) store -> { + verifyStoreCannotBeInitializedOrClosed(store); + + checkThrowsUnsupportedOperation(store::flush, "flush()"); + checkThrowsUnsupportedOperation(() -> store.remove(null), "remove()"); + checkThrowsUnsupportedOperation(() -> store.put(null, null), "put()"); assertEquals(iters.get(3), store.findSessions(KEY, 1L, 2L)); assertEquals(iters.get(4), store.findSessions(KEY, KEY, 1L, 2L)); @@ -143,6 +174,77 @@ public void testSessionStore() { }); } + @Test + public void localKeyValueStoreShouldNotAllowInitOrClose() { + doTest("LocalKeyValueStore", (Consumer<KeyValueStore<String, Long>>) store -> { + verifyStoreCannotBeInitializedOrClosed(store); + + store.flush(); + assertTrue(flushExecuted); + + store.put("1", 1L); + assertTrue(putExecuted); + + store.putIfAbsent("1", 1L); + assertTrue(putIfAbsentExecuted); + + store.putAll(Collections.emptyList()); + assertTrue(putAllExecuted); + + store.delete("1"); + assertTrue(deleteExecuted); + + assertEquals((Long) VAL, store.get(KEY)); + assertEquals(rangeIter, store.range("one", "two")); + assertEquals(allIter, store.all()); + assertEquals(VAL, store.approximateNumEntries()); + }); + } + + @Test + public void localWindowStoreShouldNotAllowInitOrClose() { + doTest("LocalWindowStore", (Consumer<WindowStore<String, Long>>) store -> { + verifyStoreCannotBeInitializedOrClosed(store); + + store.flush(); + assertTrue(flushExecuted); + + store.put("1", 1L); + assertTrue(putExecuted); + + store.put("1", 1L, 1L); + assertTrue(put3argExecuted); + + assertEquals(iters.get(0), store.fetchAll(0L, 0L)); + assertEquals(windowStoreIter, store.fetch(KEY, 0L, 1L)); + assertEquals(iters.get(1), store.fetch(KEY, KEY, 0L, 1L)); + assertEquals((Long) VAL, store.fetch(KEY, 1L)); + assertEquals(iters.get(2), store.all()); + }); + } + + @Test + public void localSessionStoreShouldNotAllowInitOrClose() { + doTest("LocalSessionStore", (Consumer<SessionStore<String, Long>>) store -> { + verifyStoreCannotBeInitializedOrClosed(store); + + store.flush(); + assertTrue(flushExecuted); + + store.remove(null); + assertTrue(removeExecuted); + + store.put(null, null); + assertTrue(putExecuted); + + assertEquals(iters.get(3), store.findSessions(KEY, 1L, 2L)); + assertEquals(iters.get(4), store.findSessions(KEY, KEY, 1L, 2L)); + assertEquals(iters.get(5), store.fetch(KEY)); + assertEquals(iters.get(6), store.fetch(KEY, KEY)); + }); + } + + @SuppressWarnings("unchecked") private KeyValueStore<String, Long> keyValueStoreMock() { final KeyValueStore<String, Long> keyValueStoreMock = mock(KeyValueStore.class); @@ -154,6 +256,31 @@ public void testSessionStore() { expect(keyValueStoreMock.range("one", "two")).andReturn(rangeIter); expect(keyValueStoreMock.all()).andReturn(allIter); + + keyValueStoreMock.put(anyString(), anyLong()); + expectLastCall().andAnswer(() -> { + putExecuted = true; + return null; + }); + + keyValueStoreMock.putIfAbsent(anyString(), anyLong()); + expectLastCall().andAnswer(() -> { + putIfAbsentExecuted = true; + return null; + }); + + keyValueStoreMock.putAll(anyObject(List.class)); + expectLastCall().andAnswer(() -> { + putAllExecuted = true; + return null; + }); + + keyValueStoreMock.delete(anyString()); + expectLastCall().andAnswer(() -> { + deleteExecuted = true; + return null; + }); + replay(keyValueStoreMock); return keyValueStoreMock; @@ -170,11 +297,24 @@ public void testSessionStore() { expect(windowStore.fetch(anyString(), anyLong())).andReturn(VAL); expect(windowStore.all()).andReturn(iters.get(2)); + windowStore.put(anyString(), anyLong()); + expectLastCall().andAnswer(() -> { + putExecuted = true; + return null; + }); + + windowStore.put(anyString(), anyLong(), anyLong()); + expectLastCall().andAnswer(() -> { + put3argExecuted = true; + return null; + }); + replay(windowStore); return windowStore; } + @SuppressWarnings("unchecked") private SessionStore<String, Long> sessionStoreMock() { final SessionStore<String, Long> sessionStore = mock(SessionStore.class); @@ -185,25 +325,31 @@ public void testSessionStore() { expect(sessionStore.fetch(anyString())).andReturn(iters.get(5)); expect(sessionStore.fetch(anyString(), anyString())).andReturn(iters.get(6)); + sessionStore.put(anyObject(Windowed.class), anyLong()); + expectLastCall().andAnswer(() -> { + putExecuted = true; + return null; + }); + + sessionStore.remove(anyObject(Windowed.class)); + expectLastCall().andAnswer(() -> { + removeExecuted = true; + return null; + }); + replay(sessionStore); return sessionStore; } - private void initStateStoreMock(final StateStore windowStore) { - expect(windowStore.name()).andReturn(STORE_NAME); - expect(windowStore.persistent()).andReturn(true); - expect(windowStore.isOpen()).andReturn(true); - - windowStore.init(null, null); - expectLastCall().andAnswer(() -> { - initExecuted = true; - return null; - }); + private void initStateStoreMock(final StateStore stateStore) { + expect(stateStore.name()).andReturn(STORE_NAME); + expect(stateStore.persistent()).andReturn(true); + expect(stateStore.isOpen()).andReturn(true); - windowStore.close(); + stateStore.flush(); expectLastCall().andAnswer(() -> { - closeExecuted = true; + flushExecuted = true; return null; }); } @@ -215,8 +361,6 @@ private void initStateStoreMock(final StateStore windowStore) { public void init(final ProcessorContext context) { final T store = (T) context.getStateStore(name); - checkStateStoreMethods(store); - checker.accept(store); } @@ -235,18 +379,13 @@ public void close() { processor.init(context); } - private void checkStateStoreMethods(final StateStore store) { - checkThrowsUnsupportedOperation(store::flush, "flush"); - + private void verifyStoreCannotBeInitializedOrClosed(final StateStore store) { assertEquals(STORE_NAME, store.name()); assertTrue(store.persistent()); assertTrue(store.isOpen()); - store.init(null, null); - assertTrue(initExecuted); - - store.close(); - assertTrue(closeExecuted); + checkThrowsUnsupportedOperation(() -> store.init(null, null), "init()"); + checkThrowsUnsupportedOperation(store::close, "close()"); } private void checkThrowsUnsupportedOperation(final Runnable check, final String name) { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java index 11050fe6f55..14b94dadab4 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java @@ -662,11 +662,6 @@ public void init(final ProcessorContext context) { public void process(final String key, final String value) { store.put(key, value); } - - @Override - public void close() { - store.close(); - } } private <K, V> ProcessorSupplier<K, V> define(final Processor<K, V> processor) { ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Kafka streams lets the user call init() and close() on a state store, when > inside Processors > -------------------------------------------------------------------------------------------- > > Key: KAFKA-6970 > URL: https://issues.apache.org/jira/browse/KAFKA-6970 > Project: Kafka > Issue Type: Bug > Components: streams > Reporter: James Cheng > Assignee: Nikolay Izhikov > Priority: Major > > When using a state store within Transform (and Processor and > TransformValues), the user is able to call init() and close() on the state > stores. Those APIs should only be called by kafka streams itself. > If possible, it would be good to guard those APIs so that the user cannot > call them. -- This message was sent by Atlassian JIRA (v7.6.3#76005)