[ 
https://issues.apache.org/jira/browse/KAFKA-8630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16880886#comment-16880886
 ] 

Sophie Blee-Goldman commented on KAFKA-8630:
--------------------------------------------

You're right, I overlooked that InternalMockProcessorContext is not part of the 
public API and definitely shouldn't be used. In general users are encouraged to 
do their testing through the TopologyTestDriver. You can access state stores 
through it and they will be managed for you so you don't have to worry about 
calling init or close. (This is especially important with the persistent 
variants of the stores as they will leak memory if not properly closed.)

That said, I can see how this would be useful even if not necessary. If you are 
interested in picking this up, I'd be happy to help! Just to outline the 
problem clearly, along with some possible solutions:

This affects all window and session stores, and may affect key-value stores in 
the future should we chose to add metrics.The problem is that the stores cast 
to InternalMockProcessorContext in order to access the  StreamsMetrics, but the 
MockProcessorContext intended for testing does not extend this class. I don't 
think it's ideal to just add "if" guards everywhere this internal context or 
the related metrics/sensors are used in the actual streams code just to make 
room for some test code.

 One possibility would be to have the Internal and Mock processor contexts' 
implement a common interface that has a `metrics()` method, and just return a 
dummy metrics in the Mock case. Alternatively we could add some kind of Mock 
stores that can hide the internals and just present a stable interface for 
testing – either way we would need a KIP. Although I believe there already was 
a KIP regarding mock stores, maybe this could be a part of that

> Unit testing a streams processor with a WindowStore throws a 
> ClassCastException
> -------------------------------------------------------------------------------
>
>                 Key: KAFKA-8630
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8630
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams-test-utils
>    Affects Versions: 2.3.0
>            Reporter: Justin Fetherolf
>            Priority: Major
>
> I was attempting to write a unit test for a class implementing the 
> {{Processor}} interface that contained a {{WindowStore}}, but running the 
> test fails with a {{ClassCastException}} coming out of 
> {{InMemoryWindowStore.init}} attempting to cast {{MockProcessorContext}} to 
> {{InternalProcessorContext}}.
> Minimal code to reproduce:
> {code:java}
> package com.cantgetthistowork;
> import org.apache.kafka.streams.processor.Processor;
> import org.apache.kafka.streams.processor.ProcessorContext;
> import org.apache.kafka.streams.state.WindowStore;
> public class InMemWindowProcessor implements Processor<String, String> {
>   private ProcessorContext context;
>   private WindowStore<String, String> windowStore;
>   @Override
>   public void init(ProcessorContext context) {
>     this.context = context;
>     windowStore = (WindowStore<String, String>) 
> context.getStateStore("my-win-store");
>   }
>   @Override
>   public void process(String key, String value) {
>   }
>   @Override
>   public void close() {
>   }
> }
> {code}
> {code:java}
> package com.cantgetthistowork;
> import java.time.Duration;
> import java.time.Instant;
> import org.apache.kafka.common.serialization.Serdes;
> import org.apache.kafka.streams.processor.MockProcessorContext;
> import org.apache.kafka.streams.state.Stores;
> import org.apache.kafka.streams.state.WindowStore;
> import org.junit.Before;
> import org.junit.Test;
> public class InMemWindowProcessorTest {
>   InMemWindowProcessor processor = null;
>   MockProcessorContext context = null;
>   @Before
>   public void setup() {
>     processor = new InMemWindowProcessor();
>     context = new MockProcessorContext();
>     WindowStore<String, String> store =
>       Stores.windowStoreBuilder(
>         Stores.inMemoryWindowStore(
>           "my-win-store",
>           Duration.ofMinutes(10),
>           Duration.ofSeconds(10),
>           false
>         ),
>         Serdes.String(),
>         Serdes.String()
>       )
>       .withLoggingDisabled()
>       .build();
>     store.init(context, store);
>     context.register(store, null);
>     processor.init(context);
>   }
>   @Test
>   public void testThings() {
>     Instant baseTime = Instant.now();
>     context.setTimestamp(baseTime.toEpochMilli());
>     context.setTopic("topic-name");
>     processor.process("key1", "value1");
>   }
> }
> {code}
>  
> I was trying this with maven, with mvn --version outputting:
> {noformat}
> Apache Maven 3.5.0 (ff8f5e7444045639af65f6095c62210b5713f426; 
> 2017-04-03T13:39:06-06:00)
> Maven home: ~/opt/apache-maven-3.5.0
> Java version: 1.8.0_212, vendor: Oracle Corporation
> Java home: /usr/lib/jvm/java-8-openjdk-amd64/jre
> Default locale: en_US, platform encoding: UTF-8
> OS name: "linux", version: "4.15.0-52-generic", arch: "amd64", family: 
> "unix"{noformat}
> And finally the stack trace:
> {noformat}
> -------------------------------------------------------
>  T E S T S
> -------------------------------------------------------
> Running com.cantgetthistowork.InMemWindowProcessorTest
> SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
> SLF4J: Defaulting to no-operation (NOP) logger implementation
> SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further 
> details.
> Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 0.076 sec <<< 
> FAILURE!
> testThings(com.cantgetthistowork.InMemWindowProcessorTest)  Time elapsed: 
> 0.05 sec  <<< ERROR!
> java.lang.ClassCastException: 
> org.apache.kafka.streams.processor.MockProcessorContext cannot be cast to 
> org.apache.kafka.streams.processor.internals.InternalProcessorContext
>     at 
> org.apache.kafka.streams.state.internals.InMemoryWindowStore.init(InMemoryWindowStore.java:91)
>     at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
>     at 
> org.apache.kafka.streams.state.internals.MeteredWindowStore.init(MeteredWindowStore.java:90)
>     at 
> com.cantgetthistowork.InMemWindowProcessorTest.setup(InMemWindowProcessorTest.java:36)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)
>     at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>     at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>     at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>     at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:24)
>     at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>     at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>     at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>     at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>     at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>     at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>     at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>     at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>     at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>     at 
> org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:252)
>     at 
> org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:141)
>     at 
> org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:112)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)
>     at 
> org.apache.maven.surefire.util.ReflectionUtils.invokeMethodWithArray(ReflectionUtils.java:189)
>     at 
> org.apache.maven.surefire.booter.ProviderFactory$ProviderProxy.invoke(ProviderFactory.java:165)
>     at 
> org.apache.maven.surefire.booter.ProviderFactory.invokeProvider(ProviderFactory.java:85)
>     at 
> org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:115)
>     at 
> org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:75)
> Results :
> Tests in error: 
>   testThings(com.cantgetthistowork.InMemWindowProcessorTest): 
> org.apache.kafka.streams.processor.MockProcessorContext cannot be cast to 
> org.apache.kafka.streams.processor.internals.InternalProcessorContext
> Tests run: 1, Failures: 0, Errors: 1, Skipped: 0{noformat}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to