[ 
https://issues.apache.org/jira/browse/KAFKA-8630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16881603#comment-16881603
 ] 

Sophie Blee-Goldman commented on KAFKA-8630:
--------------------------------------------

Yep, that's the KIP. There was some ongoing discussion but I think it got 
dropped on a floor a little while back as people became busy. You might be able 
to kick start discussion again with this as motivation – someone on the mailing 
list had a similar problem, but wanted to use key-value stores so for the time 
being they can get by with just MockProcessorContext. Of course, even though 
some stores don't currently need the InternalProcessorContext yet, we are 
actively adding more metrics and this may not be true for long. It's worth 
solving this holistically. 

That's probably a lot more work than we need to do here. You're right about the 
StreamsMetrics vs StreamsMetricsImpl – the window store, for example, needs to 
access the storeLevelSensor method, but we don't want to expose that. I agree 
it's pretty annoying that all this boils down to is needing to access some 
internal metrics/sensors, but I do think the right way to solve it is on the 
test side. From the source code side, everything is working as it should – it's 
the MockProcessorContext that fails to adequately 'mock' the real thing. I 
think some light refactoring to have the contexts implement some 
"ProcessorContextMetrics" interface could be accomplished pretty easily

> Unit testing a streams processor with a WindowStore throws a 
> ClassCastException
> -------------------------------------------------------------------------------
>
>                 Key: KAFKA-8630
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8630
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams-test-utils
>    Affects Versions: 2.3.0
>            Reporter: Justin Fetherolf
>            Priority: Major
>
> I was attempting to write a unit test for a class implementing the 
> {{Processor}} interface that contained a {{WindowStore}}, but running the 
> test fails with a {{ClassCastException}} coming out of 
> {{InMemoryWindowStore.init}} attempting to cast {{MockProcessorContext}} to 
> {{InternalProcessorContext}}.
> Minimal code to reproduce:
> {code:java}
> package com.cantgetthistowork;
> import org.apache.kafka.streams.processor.Processor;
> import org.apache.kafka.streams.processor.ProcessorContext;
> import org.apache.kafka.streams.state.WindowStore;
> public class InMemWindowProcessor implements Processor<String, String> {
>   private ProcessorContext context;
>   private WindowStore<String, String> windowStore;
>   @Override
>   public void init(ProcessorContext context) {
>     this.context = context;
>     windowStore = (WindowStore<String, String>) 
> context.getStateStore("my-win-store");
>   }
>   @Override
>   public void process(String key, String value) {
>   }
>   @Override
>   public void close() {
>   }
> }
> {code}
> {code:java}
> package com.cantgetthistowork;
> import java.time.Duration;
> import java.time.Instant;
> import org.apache.kafka.common.serialization.Serdes;
> import org.apache.kafka.streams.processor.MockProcessorContext;
> import org.apache.kafka.streams.state.Stores;
> import org.apache.kafka.streams.state.WindowStore;
> import org.junit.Before;
> import org.junit.Test;
> public class InMemWindowProcessorTest {
>   InMemWindowProcessor processor = null;
>   MockProcessorContext context = null;
>   @Before
>   public void setup() {
>     processor = new InMemWindowProcessor();
>     context = new MockProcessorContext();
>     WindowStore<String, String> store =
>       Stores.windowStoreBuilder(
>         Stores.inMemoryWindowStore(
>           "my-win-store",
>           Duration.ofMinutes(10),
>           Duration.ofSeconds(10),
>           false
>         ),
>         Serdes.String(),
>         Serdes.String()
>       )
>       .withLoggingDisabled()
>       .build();
>     store.init(context, store);
>     context.register(store, null);
>     processor.init(context);
>   }
>   @Test
>   public void testThings() {
>     Instant baseTime = Instant.now();
>     context.setTimestamp(baseTime.toEpochMilli());
>     context.setTopic("topic-name");
>     processor.process("key1", "value1");
>   }
> }
> {code}
>  
> I was trying this with maven, with mvn --version outputting:
> {noformat}
> Apache Maven 3.5.0 (ff8f5e7444045639af65f6095c62210b5713f426; 
> 2017-04-03T13:39:06-06:00)
> Maven home: ~/opt/apache-maven-3.5.0
> Java version: 1.8.0_212, vendor: Oracle Corporation
> Java home: /usr/lib/jvm/java-8-openjdk-amd64/jre
> Default locale: en_US, platform encoding: UTF-8
> OS name: "linux", version: "4.15.0-52-generic", arch: "amd64", family: 
> "unix"{noformat}
> And finally the stack trace:
> {noformat}
> -------------------------------------------------------
>  T E S T S
> -------------------------------------------------------
> Running com.cantgetthistowork.InMemWindowProcessorTest
> SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
> SLF4J: Defaulting to no-operation (NOP) logger implementation
> SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further 
> details.
> Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 0.076 sec <<< 
> FAILURE!
> testThings(com.cantgetthistowork.InMemWindowProcessorTest)  Time elapsed: 
> 0.05 sec  <<< ERROR!
> java.lang.ClassCastException: 
> org.apache.kafka.streams.processor.MockProcessorContext cannot be cast to 
> org.apache.kafka.streams.processor.internals.InternalProcessorContext
>     at 
> org.apache.kafka.streams.state.internals.InMemoryWindowStore.init(InMemoryWindowStore.java:91)
>     at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
>     at 
> org.apache.kafka.streams.state.internals.MeteredWindowStore.init(MeteredWindowStore.java:90)
>     at 
> com.cantgetthistowork.InMemWindowProcessorTest.setup(InMemWindowProcessorTest.java:36)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)
>     at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>     at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>     at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>     at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:24)
>     at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>     at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>     at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>     at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>     at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>     at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>     at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>     at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>     at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>     at 
> org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:252)
>     at 
> org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:141)
>     at 
> org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:112)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)
>     at 
> org.apache.maven.surefire.util.ReflectionUtils.invokeMethodWithArray(ReflectionUtils.java:189)
>     at 
> org.apache.maven.surefire.booter.ProviderFactory$ProviderProxy.invoke(ProviderFactory.java:165)
>     at 
> org.apache.maven.surefire.booter.ProviderFactory.invokeProvider(ProviderFactory.java:85)
>     at 
> org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:115)
>     at 
> org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:75)
> Results :
> Tests in error: 
>   testThings(com.cantgetthistowork.InMemWindowProcessorTest): 
> org.apache.kafka.streams.processor.MockProcessorContext cannot be cast to 
> org.apache.kafka.streams.processor.internals.InternalProcessorContext
> Tests run: 1, Failures: 0, Errors: 1, Skipped: 0{noformat}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to