[
https://issues.apache.org/jira/browse/KAFKA-8630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16962413#comment-16962413
]
Sophie Blee-Goldman commented on KAFKA-8630:
--------------------------------------------
That's fair, I agree that would be the ideal resolution to this ticket and
something we should do anyways. That said I've been thinking lately this ticket
is actually pretty important as without it users basically have no legitimate
way of unit testing a stateful transformer, processor, etc.
But if we aren't able to fix this issue holistically (ie resolve KAFKA-9109)
by the next release cycle, I would vote to implement the quick and dirty
solution I outlined in KAFKA-9088 just to give users something.
> Unit testing a streams processor with a WindowStore throws a
> ClassCastException
> -------------------------------------------------------------------------------
>
> Key: KAFKA-8630
> URL: https://issues.apache.org/jira/browse/KAFKA-8630
> Project: Kafka
> Issue Type: Bug
> Components: streams-test-utils
> Affects Versions: 2.3.0
> Reporter: Justin Fetherolf
> Priority: Major
>
> I was attempting to write a unit test for a class implementing the
> {{Processor}} interface that contained a {{WindowStore}}, but running the
> test fails with a {{ClassCastException}} coming out of
> {{InMemoryWindowStore.init}} attempting to cast {{MockProcessorContext}} to
> {{InternalProcessorContext}}.
> Minimal code to reproduce:
> {code:java}
> package com.cantgetthistowork;
> import org.apache.kafka.streams.processor.Processor;
> import org.apache.kafka.streams.processor.ProcessorContext;
> import org.apache.kafka.streams.state.WindowStore;
> public class InMemWindowProcessor implements Processor<String, String> {
> private ProcessorContext context;
> private WindowStore<String, String> windowStore;
> @Override
> public void init(ProcessorContext context) {
> this.context = context;
> windowStore = (WindowStore<String, String>)
> context.getStateStore("my-win-store");
> }
> @Override
> public void process(String key, String value) {
> }
> @Override
> public void close() {
> }
> }
> {code}
> {code:java}
> package com.cantgetthistowork;
> import java.time.Duration;
> import java.time.Instant;
> import org.apache.kafka.common.serialization.Serdes;
> import org.apache.kafka.streams.processor.MockProcessorContext;
> import org.apache.kafka.streams.state.Stores;
> import org.apache.kafka.streams.state.WindowStore;
> import org.junit.Before;
> import org.junit.Test;
> public class InMemWindowProcessorTest {
> InMemWindowProcessor processor = null;
> MockProcessorContext context = null;
> @Before
> public void setup() {
> processor = new InMemWindowProcessor();
> context = new MockProcessorContext();
> WindowStore<String, String> store =
> Stores.windowStoreBuilder(
> Stores.inMemoryWindowStore(
> "my-win-store",
> Duration.ofMinutes(10),
> Duration.ofSeconds(10),
> false
> ),
> Serdes.String(),
> Serdes.String()
> )
> .withLoggingDisabled()
> .build();
> store.init(context, store);
> context.register(store, null);
> processor.init(context);
> }
> @Test
> public void testThings() {
> Instant baseTime = Instant.now();
> context.setTimestamp(baseTime.toEpochMilli());
> context.setTopic("topic-name");
> processor.process("key1", "value1");
> }
> }
> {code}
>
> I was trying this with maven, with mvn --version outputting:
> {noformat}
> Apache Maven 3.5.0 (ff8f5e7444045639af65f6095c62210b5713f426;
> 2017-04-03T13:39:06-06:00)
> Maven home: ~/opt/apache-maven-3.5.0
> Java version: 1.8.0_212, vendor: Oracle Corporation
> Java home: /usr/lib/jvm/java-8-openjdk-amd64/jre
> Default locale: en_US, platform encoding: UTF-8
> OS name: "linux", version: "4.15.0-52-generic", arch: "amd64", family:
> "unix"{noformat}
> And finally the stack trace:
> {noformat}
> -------------------------------------------------------
> T E S T S
> -------------------------------------------------------
> Running com.cantgetthistowork.InMemWindowProcessorTest
> SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
> SLF4J: Defaulting to no-operation (NOP) logger implementation
> SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further
> details.
> Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 0.076 sec <<<
> FAILURE!
> testThings(com.cantgetthistowork.InMemWindowProcessorTest) Time elapsed:
> 0.05 sec <<< ERROR!
> java.lang.ClassCastException:
> org.apache.kafka.streams.processor.MockProcessorContext cannot be cast to
> org.apache.kafka.streams.processor.internals.InternalProcessorContext
> at
> org.apache.kafka.streams.state.internals.InMemoryWindowStore.init(InMemoryWindowStore.java:91)
> at
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
> at
> org.apache.kafka.streams.state.internals.MeteredWindowStore.init(MeteredWindowStore.java:90)
> at
> com.cantgetthistowork.InMemWindowProcessorTest.setup(InMemWindowProcessorTest.java:36)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> at
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> at
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:24)
> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> at
> org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:252)
> at
> org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:141)
> at
> org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:112)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.maven.surefire.util.ReflectionUtils.invokeMethodWithArray(ReflectionUtils.java:189)
> at
> org.apache.maven.surefire.booter.ProviderFactory$ProviderProxy.invoke(ProviderFactory.java:165)
> at
> org.apache.maven.surefire.booter.ProviderFactory.invokeProvider(ProviderFactory.java:85)
> at
> org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:115)
> at
> org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:75)
> Results :
> Tests in error:
> testThings(com.cantgetthistowork.InMemWindowProcessorTest):
> org.apache.kafka.streams.processor.MockProcessorContext cannot be cast to
> org.apache.kafka.streams.processor.internals.InternalProcessorContext
> Tests run: 1, Failures: 0, Errors: 1, Skipped: 0{noformat}
>
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)