[jira] [Commented] (KAFKA-8630) Unit testing a streams processor with a WindowStore throws a ClassCastException
[ https://issues.apache.org/jira/browse/KAFKA-8630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16880923#comment-16880923 ] Justin Fetherolf commented on KAFKA-8630: - The `ProcessorContext` interface already offers a `metrics()` method, and `InternalProcessorContext` for some reason overrides it to return a `StreamsMetricsImpl` rather than `StreamsMetrics`. The bit of poking around in the code I've done this evening it seems to be kind of a mixed bag of stores needing just this `metrics()` method (and thus not needing to do the cast) and needing the other methods that `InternalProcessorContext` offers. Is the possibly pre-existing KIP you're thinking of [KIP-448|https://cwiki.apache.org/confluence/x/SAeZBg]? > Unit testing a streams processor with a WindowStore throws a > ClassCastException > --- > > Key: KAFKA-8630 > URL: https://issues.apache.org/jira/browse/KAFKA-8630 > Project: Kafka > Issue Type: Bug > Components: streams-test-utils >Affects Versions: 2.3.0 >Reporter: Justin Fetherolf >Priority: Major > > I was attempting to write a unit test for a class implementing the > {{Processor}} interface that contained a {{WindowStore}}, but running the > test fails with a {{ClassCastException}} coming out of > {{InMemoryWindowStore.init}} attempting to cast {{MockProcessorContext}} to > {{InternalProcessorContext}}. > Minimal code to reproduce: > {code:java} > package com.cantgetthistowork; > import org.apache.kafka.streams.processor.Processor; > import org.apache.kafka.streams.processor.ProcessorContext; > import org.apache.kafka.streams.state.WindowStore; > public class InMemWindowProcessor implements Processor { > private ProcessorContext context; > private WindowStore windowStore; > @Override > public void init(ProcessorContext context) { > this.context = context; > windowStore = (WindowStore) > context.getStateStore("my-win-store"); > } > @Override > public void process(String key, String value) { > } > @Override > public void close() { > } > } > {code} > {code:java} > package com.cantgetthistowork; > import java.time.Duration; > import java.time.Instant; > import org.apache.kafka.common.serialization.Serdes; > import org.apache.kafka.streams.processor.MockProcessorContext; > import org.apache.kafka.streams.state.Stores; > import org.apache.kafka.streams.state.WindowStore; > import org.junit.Before; > import org.junit.Test; > public class InMemWindowProcessorTest { > InMemWindowProcessor processor = null; > MockProcessorContext context = null; > @Before > public void setup() { > processor = new InMemWindowProcessor(); > context = new MockProcessorContext(); > WindowStore store = > Stores.windowStoreBuilder( > Stores.inMemoryWindowStore( > "my-win-store", > Duration.ofMinutes(10), > Duration.ofSeconds(10), > false > ), > Serdes.String(), > Serdes.String() > ) > .withLoggingDisabled() > .build(); > store.init(context, store); > context.register(store, null); > processor.init(context); > } > @Test > public void testThings() { > Instant baseTime = Instant.now(); > context.setTimestamp(baseTime.toEpochMilli()); > context.setTopic("topic-name"); > processor.process("key1", "value1"); > } > } > {code} > > I was trying this with maven, with mvn --version outputting: > {noformat} > Apache Maven 3.5.0 (ff8f5e7444045639af65f6095c62210b5713f426; > 2017-04-03T13:39:06-06:00) > Maven home: ~/opt/apache-maven-3.5.0 > Java version: 1.8.0_212, vendor: Oracle Corporation > Java home: /usr/lib/jvm/java-8-openjdk-amd64/jre > Default locale: en_US, platform encoding: UTF-8 > OS name: "linux", version: "4.15.0-52-generic", arch: "amd64", family: > "unix"{noformat} > And finally the stack trace: > {noformat} > --- > T E S T S > --- > Running com.cantgetthistowork.InMemWindowProcessorTest > SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". > SLF4J: Defaulting to no-operation (NOP) logger implementation > SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further > details. > Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 0.076 sec <<< > FAILURE! > testThings(com.cantgetthistowork.InMemWindowProcessorTest) Time elapsed: > 0.05 sec <<< ERROR! > java.lang.ClassCastException: > org.apache.kafka.streams.processor.MockProcessorContext cannot be cast to > org.apache.kafka.streams.processor.internals.InternalProcessorContext > at > org.apache.kafka.streams.state.internals.InMemoryWindowStore.init(InMemoryWindowStore.java:91) > at >
[jira] [Commented] (KAFKA-8630) Unit testing a streams processor with a WindowStore throws a ClassCastException
[ https://issues.apache.org/jira/browse/KAFKA-8630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16880831#comment-16880831 ] Justin Fetherolf commented on KAFKA-8630: - [~ableegoldman] I personally feel that this is still a problem and shouldn't be closed. Trying to find `InternalMockProcessorContext` revealed another class `MockInternalProcessorContext`, both of which are in test code and not available via the public packages (or at least not in `kafka-streams` or `kafka-streams-test-utils` which seems like the logical place for them). I think the root problem here is not having necessary mock contexts in the proper package, and not having the documentation to support their use cases. > Unit testing a streams processor with a WindowStore throws a > ClassCastException > --- > > Key: KAFKA-8630 > URL: https://issues.apache.org/jira/browse/KAFKA-8630 > Project: Kafka > Issue Type: Bug > Components: streams-test-utils >Affects Versions: 2.3.0 >Reporter: Justin Fetherolf >Priority: Major > > I was attempting to write a unit test for a class implementing the > {{Processor}} interface that contained a {{WindowStore}}, but running the > test fails with a {{ClassCastException}} coming out of > {{InMemoryWindowStore.init}} attempting to cast {{MockProcessorContext}} to > {{InternalProcessorContext}}. > Minimal code to reproduce: > {code:java} > package com.cantgetthistowork; > import org.apache.kafka.streams.processor.Processor; > import org.apache.kafka.streams.processor.ProcessorContext; > import org.apache.kafka.streams.state.WindowStore; > public class InMemWindowProcessor implements Processor { > private ProcessorContext context; > private WindowStore windowStore; > @Override > public void init(ProcessorContext context) { > this.context = context; > windowStore = (WindowStore) > context.getStateStore("my-win-store"); > } > @Override > public void process(String key, String value) { > } > @Override > public void close() { > } > } > {code} > {code:java} > package com.cantgetthistowork; > import java.time.Duration; > import java.time.Instant; > import org.apache.kafka.common.serialization.Serdes; > import org.apache.kafka.streams.processor.MockProcessorContext; > import org.apache.kafka.streams.state.Stores; > import org.apache.kafka.streams.state.WindowStore; > import org.junit.Before; > import org.junit.Test; > public class InMemWindowProcessorTest { > InMemWindowProcessor processor = null; > MockProcessorContext context = null; > @Before > public void setup() { > processor = new InMemWindowProcessor(); > context = new MockProcessorContext(); > WindowStore store = > Stores.windowStoreBuilder( > Stores.inMemoryWindowStore( > "my-win-store", > Duration.ofMinutes(10), > Duration.ofSeconds(10), > false > ), > Serdes.String(), > Serdes.String() > ) > .withLoggingDisabled() > .build(); > store.init(context, store); > context.register(store, null); > processor.init(context); > } > @Test > public void testThings() { > Instant baseTime = Instant.now(); > context.setTimestamp(baseTime.toEpochMilli()); > context.setTopic("topic-name"); > processor.process("key1", "value1"); > } > } > {code} > > I was trying this with maven, with mvn --version outputting: > {noformat} > Apache Maven 3.5.0 (ff8f5e7444045639af65f6095c62210b5713f426; > 2017-04-03T13:39:06-06:00) > Maven home: ~/opt/apache-maven-3.5.0 > Java version: 1.8.0_212, vendor: Oracle Corporation > Java home: /usr/lib/jvm/java-8-openjdk-amd64/jre > Default locale: en_US, platform encoding: UTF-8 > OS name: "linux", version: "4.15.0-52-generic", arch: "amd64", family: > "unix"{noformat} > And finally the stack trace: > {noformat} > --- > T E S T S > --- > Running com.cantgetthistowork.InMemWindowProcessorTest > SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". > SLF4J: Defaulting to no-operation (NOP) logger implementation > SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further > details. > Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 0.076 sec <<< > FAILURE! > testThings(com.cantgetthistowork.InMemWindowProcessorTest) Time elapsed: > 0.05 sec <<< ERROR! > java.lang.ClassCastException: > org.apache.kafka.streams.processor.MockProcessorContext cannot be cast to > org.apache.kafka.streams.processor.internals.InternalProcessorContext > at > org.apache.kafka.streams.state.internals.InMemoryWindowStore.init(InMemoryWindowStore.java:91) > at >
[jira] [Updated] (KAFKA-8630) Unit testing a streams processor with a WindowStore throws a ClassCastException
[ https://issues.apache.org/jira/browse/KAFKA-8630?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Justin Fetherolf updated KAFKA-8630: Description: I was attempting to write a unit test for a class implementing the {{Processor}} interface that contained a {{WindowStore}}, but running the test fails with a {{ClassCastException}} coming out of {{InMemoryWindowStore.init}} attempting to cast {{MockProcessorContext}} to {{InternalProcessorContext}}. Minimal code to reproduce: {code:java} package com.cantgetthistowork; import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.state.WindowStore; public class InMemWindowProcessor implements Processor { private ProcessorContext context; private WindowStore windowStore; @Override public void init(ProcessorContext context) { this.context = context; windowStore = (WindowStore) context.getStateStore("my-win-store"); } @Override public void process(String key, String value) { } @Override public void close() { } } {code} {code:java} package com.cantgetthistowork; import java.time.Duration; import java.time.Instant; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.processor.MockProcessorContext; import org.apache.kafka.streams.state.Stores; import org.apache.kafka.streams.state.WindowStore; import org.junit.Before; import org.junit.Test; public class InMemWindowProcessorTest { InMemWindowProcessor processor = null; MockProcessorContext context = null; @Before public void setup() { processor = new InMemWindowProcessor(); context = new MockProcessorContext(); WindowStore store = Stores.windowStoreBuilder( Stores.inMemoryWindowStore( "my-win-store", Duration.ofMinutes(10), Duration.ofSeconds(10), false ), Serdes.String(), Serdes.String() ) .withLoggingDisabled() .build(); store.init(context, store); context.register(store, null); processor.init(context); } @Test public void testThings() { Instant baseTime = Instant.now(); context.setTimestamp(baseTime.toEpochMilli()); context.setTopic("topic-name"); processor.process("key1", "value1"); } } {code} I was trying this with maven, with mvn --version outputting: {noformat} Apache Maven 3.5.0 (ff8f5e7444045639af65f6095c62210b5713f426; 2017-04-03T13:39:06-06:00) Maven home: ~/opt/apache-maven-3.5.0 Java version: 1.8.0_212, vendor: Oracle Corporation Java home: /usr/lib/jvm/java-8-openjdk-amd64/jre Default locale: en_US, platform encoding: UTF-8 OS name: "linux", version: "4.15.0-52-generic", arch: "amd64", family: "unix"{noformat} And finally the stack trace: {noformat} --- T E S T S --- Running com.cantgetthistowork.InMemWindowProcessorTest SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 0.076 sec <<< FAILURE! testThings(com.cantgetthistowork.InMemWindowProcessorTest) Time elapsed: 0.05 sec <<< ERROR! java.lang.ClassCastException: org.apache.kafka.streams.processor.MockProcessorContext cannot be cast to org.apache.kafka.streams.processor.internals.InternalProcessorContext at org.apache.kafka.streams.state.internals.InMemoryWindowStore.init(InMemoryWindowStore.java:91) at org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48) at org.apache.kafka.streams.state.internals.MeteredWindowStore.init(MeteredWindowStore.java:90) at com.cantgetthistowork.InMemWindowProcessorTest.setup(InMemWindowProcessorTest.java:36) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:24) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) at
[jira] [Created] (KAFKA-8630) Unit testing a streams processor with a WindowStore throws a ClassCastException
Justin Fetherolf created KAFKA-8630: --- Summary: Unit testing a streams processor with a WindowStore throws a ClassCastException Key: KAFKA-8630 URL: https://issues.apache.org/jira/browse/KAFKA-8630 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 2.3.0 Reporter: Justin Fetherolf I was attempting to write a unit test for a class implementing the {{Processor}} interface that contained a {{WindowStore}}, but running the test fails with a {{ClassCastException}} coming out of {{InMemoryWindowStore.init}} attempting to cast {{MockProcessorContext}} to {{InternalProcessorContext}}. Minimal code to reproduce: {{package com.cantgetthistowork; import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.state.WindowStore; public class InMemWindowProcessor implements Processor \{ private ProcessorContext context; private WindowStore windowStore; @Override public void init(ProcessorContext context) { this.context = context; windowStore = (WindowStore) context.getStateStore("my-win-store"); } @Override public void process(String key, String value) \{ } @Override public void close() \{ } }}} {code:java} package com.cantgetthistowork; import java.time.Duration; import java.time.Instant; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.processor.MockProcessorContext; import org.apache.kafka.streams.state.Stores; import org.apache.kafka.streams.state.WindowStore; import org.junit.Before; import org.junit.Test; public class InMemWindowProcessorTest { InMemWindowProcessor processor = null; MockProcessorContext context = null; @Before public void setup() { processor = new InMemWindowProcessor(); context = new MockProcessorContext(); WindowStore store = Stores.windowStoreBuilder( Stores.inMemoryWindowStore( "my-win-store", Duration.ofMinutes(10), Duration.ofSeconds(10), false ), Serdes.String(), Serdes.String() ) .withLoggingDisabled() .build(); store.init(context, store); context.register(store, null); processor.init(context); } @Test public void testThings() { Instant baseTime = Instant.now(); context.setTimestamp(baseTime.toEpochMilli()); context.setTopic("topic-name"); processor.process("key1", "value1"); } } {code} I was trying this with maven, with mvn --version outputting: {noformat} Apache Maven 3.5.0 (ff8f5e7444045639af65f6095c62210b5713f426; 2017-04-03T13:39:06-06:00) Maven home: ~/opt/apache-maven-3.5.0 Java version: 1.8.0_212, vendor: Oracle Corporation Java home: /usr/lib/jvm/java-8-openjdk-amd64/jre Default locale: en_US, platform encoding: UTF-8 OS name: "linux", version: "4.15.0-52-generic", arch: "amd64", family: "unix"{noformat} And finally the stack trace: {noformat} --- T E S T S --- Running com.cantgetthistowork.InMemWindowProcessorTest SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 0.076 sec <<< FAILURE! testThings(com.cantgetthistowork.InMemWindowProcessorTest) Time elapsed: 0.05 sec <<< ERROR! java.lang.ClassCastException: org.apache.kafka.streams.processor.MockProcessorContext cannot be cast to org.apache.kafka.streams.processor.internals.InternalProcessorContext at org.apache.kafka.streams.state.internals.InMemoryWindowStore.init(InMemoryWindowStore.java:91) at org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48) at org.apache.kafka.streams.state.internals.MeteredWindowStore.init(MeteredWindowStore.java:90) at com.cantgetthistowork.InMemWindowProcessorTest.setup(InMemWindowProcessorTest.java:36) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:24) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) at