[ 
https://issues.apache.org/jira/browse/KAFKA-8630?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Justin Fetherolf updated KAFKA-8630:
------------------------------------
    Description: 
I was attempting to write a unit test for a class implementing the 
{{Processor}} interface that contained a {{WindowStore}}, but running the test 
fails with a {{ClassCastException}} coming out of {{InMemoryWindowStore.init}} 
attempting to cast {{MockProcessorContext}} to {{InternalProcessorContext}}.

Minimal code to reproduce:
{code:java}
package com.cantgetthistowork;

import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.WindowStore;

public class InMemWindowProcessor implements Processor<String, String> {

  private ProcessorContext context;
  private WindowStore<String, String> windowStore;

  @Override
  public void init(ProcessorContext context) {
    this.context = context;

    windowStore = (WindowStore<String, String>) 
context.getStateStore("my-win-store");
  }

  @Override
  public void process(String key, String value) {
  }

  @Override
  public void close() {
  }

}
{code}
{code:java}
package com.cantgetthistowork;

import java.time.Duration;
import java.time.Instant;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.processor.MockProcessorContext;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.WindowStore;
import org.junit.Before;
import org.junit.Test;

public class InMemWindowProcessorTest {

  InMemWindowProcessor processor = null;
  MockProcessorContext context = null;

  @Before
  public void setup() {
    processor = new InMemWindowProcessor();
    context = new MockProcessorContext();

    WindowStore<String, String> store =
      Stores.windowStoreBuilder(
        Stores.inMemoryWindowStore(
          "my-win-store",
          Duration.ofMinutes(10),
          Duration.ofSeconds(10),
          false
        ),
        Serdes.String(),
        Serdes.String()
      )
      .withLoggingDisabled()
      .build();
    store.init(context, store);
    context.register(store, null);
    processor.init(context);
  }

  @Test
  public void testThings() {
    Instant baseTime = Instant.now();
    context.setTimestamp(baseTime.toEpochMilli());
    context.setTopic("topic-name");
    processor.process("key1", "value1");
  }

}
{code}
 

I was trying this with maven, with mvn --version outputting:
{noformat}
Apache Maven 3.5.0 (ff8f5e7444045639af65f6095c62210b5713f426; 
2017-04-03T13:39:06-06:00)
Maven home: ~/opt/apache-maven-3.5.0
Java version: 1.8.0_212, vendor: Oracle Corporation
Java home: /usr/lib/jvm/java-8-openjdk-amd64/jre
Default locale: en_US, platform encoding: UTF-8
OS name: "linux", version: "4.15.0-52-generic", arch: "amd64", family: 
"unix"{noformat}
And finally the stack trace:
{noformat}
-------------------------------------------------------
 T E S T S
-------------------------------------------------------
Running com.cantgetthistowork.InMemWindowProcessorTest
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further 
details.
Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 0.076 sec <<< 
FAILURE!
testThings(com.cantgetthistowork.InMemWindowProcessorTest)  Time elapsed: 0.05 
sec  <<< ERROR!
java.lang.ClassCastException: 
org.apache.kafka.streams.processor.MockProcessorContext cannot be cast to 
org.apache.kafka.streams.processor.internals.InternalProcessorContext
    at 
org.apache.kafka.streams.state.internals.InMemoryWindowStore.init(InMemoryWindowStore.java:91)
    at 
org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
    at 
org.apache.kafka.streams.state.internals.MeteredWindowStore.init(MeteredWindowStore.java:90)
    at 
com.cantgetthistowork.InMemWindowProcessorTest.setup(InMemWindowProcessorTest.java:36)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
    at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
    at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
    at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:24)
    at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
    at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
    at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
    at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
    at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
    at 
org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:252)
    at 
org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:141)
    at 
org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:112)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at 
org.apache.maven.surefire.util.ReflectionUtils.invokeMethodWithArray(ReflectionUtils.java:189)
    at 
org.apache.maven.surefire.booter.ProviderFactory$ProviderProxy.invoke(ProviderFactory.java:165)
    at 
org.apache.maven.surefire.booter.ProviderFactory.invokeProvider(ProviderFactory.java:85)
    at 
org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:115)
    at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:75)


Results :

Tests in error: 
  testThings(com.cantgetthistowork.InMemWindowProcessorTest): 
org.apache.kafka.streams.processor.MockProcessorContext cannot be cast to 
org.apache.kafka.streams.processor.internals.InternalProcessorContext

Tests run: 1, Failures: 0, Errors: 1, Skipped: 0{noformat}
 

 

  was:
I was attempting to write a unit test for a class implementing the 
{{Processor}} interface that contained a {{WindowStore}}, but running the test 
fails with a {{ClassCastException}} coming out of {{InMemoryWindowStore.init}} 
attempting to cast {{MockProcessorContext}} to {{InternalProcessorContext}}.

Minimal code to reproduce:

 

{{package com.cantgetthistowork;

import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.WindowStore;

public class InMemWindowProcessor implements Processor<String, String> \{

  private ProcessorContext context;
  private WindowStore<String, String> windowStore;

  @Override
  public void init(ProcessorContext context) {
    this.context = context;

    windowStore = (WindowStore<String, String>) 
context.getStateStore("my-win-store");
  }

  @Override
  public void process(String key, String value) \{
  }

  @Override
  public void close() \{
  }

}}}

 

 
{code:java}
package com.cantgetthistowork;

import java.time.Duration;
import java.time.Instant;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.processor.MockProcessorContext;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.WindowStore;
import org.junit.Before;
import org.junit.Test;

public class InMemWindowProcessorTest {

  InMemWindowProcessor processor = null;
  MockProcessorContext context = null;

  @Before
  public void setup() {
    processor = new InMemWindowProcessor();
    context = new MockProcessorContext();

    WindowStore<String, String> store =
      Stores.windowStoreBuilder(
        Stores.inMemoryWindowStore(
          "my-win-store",
          Duration.ofMinutes(10),
          Duration.ofSeconds(10),
          false
        ),
        Serdes.String(),
        Serdes.String()
      )
      .withLoggingDisabled()
      .build();
    store.init(context, store);
    context.register(store, null);
    processor.init(context);
  }

  @Test
  public void testThings() {
    Instant baseTime = Instant.now();
    context.setTimestamp(baseTime.toEpochMilli());
    context.setTopic("topic-name");
    processor.process("key1", "value1");
  }

}
{code}
 

I was trying this with maven, with mvn --version outputting:
{noformat}
Apache Maven 3.5.0 (ff8f5e7444045639af65f6095c62210b5713f426; 
2017-04-03T13:39:06-06:00)
Maven home: ~/opt/apache-maven-3.5.0
Java version: 1.8.0_212, vendor: Oracle Corporation
Java home: /usr/lib/jvm/java-8-openjdk-amd64/jre
Default locale: en_US, platform encoding: UTF-8
OS name: "linux", version: "4.15.0-52-generic", arch: "amd64", family: 
"unix"{noformat}
And finally the stack trace:
{noformat}
-------------------------------------------------------
 T E S T S
-------------------------------------------------------
Running com.cantgetthistowork.InMemWindowProcessorTest
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further 
details.
Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 0.076 sec <<< 
FAILURE!
testThings(com.cantgetthistowork.InMemWindowProcessorTest)  Time elapsed: 0.05 
sec  <<< ERROR!
java.lang.ClassCastException: 
org.apache.kafka.streams.processor.MockProcessorContext cannot be cast to 
org.apache.kafka.streams.processor.internals.InternalProcessorContext
    at 
org.apache.kafka.streams.state.internals.InMemoryWindowStore.init(InMemoryWindowStore.java:91)
    at 
org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
    at 
org.apache.kafka.streams.state.internals.MeteredWindowStore.init(MeteredWindowStore.java:90)
    at 
com.cantgetthistowork.InMemWindowProcessorTest.setup(InMemWindowProcessorTest.java:36)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
    at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
    at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
    at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:24)
    at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
    at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
    at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
    at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
    at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
    at 
org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:252)
    at 
org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:141)
    at 
org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:112)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at 
org.apache.maven.surefire.util.ReflectionUtils.invokeMethodWithArray(ReflectionUtils.java:189)
    at 
org.apache.maven.surefire.booter.ProviderFactory$ProviderProxy.invoke(ProviderFactory.java:165)
    at 
org.apache.maven.surefire.booter.ProviderFactory.invokeProvider(ProviderFactory.java:85)
    at 
org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:115)
    at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:75)


Results :

Tests in error: 
  testThings(com.cantgetthistowork.InMemWindowProcessorTest): 
org.apache.kafka.streams.processor.MockProcessorContext cannot be cast to 
org.apache.kafka.streams.processor.internals.InternalProcessorContext

Tests run: 1, Failures: 0, Errors: 1, Skipped: 0{noformat}
 

 


> Unit testing a streams processor with a WindowStore throws a 
> ClassCastException
> -------------------------------------------------------------------------------
>
>                 Key: KAFKA-8630
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8630
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.3.0
>            Reporter: Justin Fetherolf
>            Priority: Major
>
> I was attempting to write a unit test for a class implementing the 
> {{Processor}} interface that contained a {{WindowStore}}, but running the 
> test fails with a {{ClassCastException}} coming out of 
> {{InMemoryWindowStore.init}} attempting to cast {{MockProcessorContext}} to 
> {{InternalProcessorContext}}.
> Minimal code to reproduce:
> {code:java}
> package com.cantgetthistowork;
> import org.apache.kafka.streams.processor.Processor;
> import org.apache.kafka.streams.processor.ProcessorContext;
> import org.apache.kafka.streams.state.WindowStore;
> public class InMemWindowProcessor implements Processor<String, String> {
>   private ProcessorContext context;
>   private WindowStore<String, String> windowStore;
>   @Override
>   public void init(ProcessorContext context) {
>     this.context = context;
>     windowStore = (WindowStore<String, String>) 
> context.getStateStore("my-win-store");
>   }
>   @Override
>   public void process(String key, String value) {
>   }
>   @Override
>   public void close() {
>   }
> }
> {code}
> {code:java}
> package com.cantgetthistowork;
> import java.time.Duration;
> import java.time.Instant;
> import org.apache.kafka.common.serialization.Serdes;
> import org.apache.kafka.streams.processor.MockProcessorContext;
> import org.apache.kafka.streams.state.Stores;
> import org.apache.kafka.streams.state.WindowStore;
> import org.junit.Before;
> import org.junit.Test;
> public class InMemWindowProcessorTest {
>   InMemWindowProcessor processor = null;
>   MockProcessorContext context = null;
>   @Before
>   public void setup() {
>     processor = new InMemWindowProcessor();
>     context = new MockProcessorContext();
>     WindowStore<String, String> store =
>       Stores.windowStoreBuilder(
>         Stores.inMemoryWindowStore(
>           "my-win-store",
>           Duration.ofMinutes(10),
>           Duration.ofSeconds(10),
>           false
>         ),
>         Serdes.String(),
>         Serdes.String()
>       )
>       .withLoggingDisabled()
>       .build();
>     store.init(context, store);
>     context.register(store, null);
>     processor.init(context);
>   }
>   @Test
>   public void testThings() {
>     Instant baseTime = Instant.now();
>     context.setTimestamp(baseTime.toEpochMilli());
>     context.setTopic("topic-name");
>     processor.process("key1", "value1");
>   }
> }
> {code}
>  
> I was trying this with maven, with mvn --version outputting:
> {noformat}
> Apache Maven 3.5.0 (ff8f5e7444045639af65f6095c62210b5713f426; 
> 2017-04-03T13:39:06-06:00)
> Maven home: ~/opt/apache-maven-3.5.0
> Java version: 1.8.0_212, vendor: Oracle Corporation
> Java home: /usr/lib/jvm/java-8-openjdk-amd64/jre
> Default locale: en_US, platform encoding: UTF-8
> OS name: "linux", version: "4.15.0-52-generic", arch: "amd64", family: 
> "unix"{noformat}
> And finally the stack trace:
> {noformat}
> -------------------------------------------------------
>  T E S T S
> -------------------------------------------------------
> Running com.cantgetthistowork.InMemWindowProcessorTest
> SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
> SLF4J: Defaulting to no-operation (NOP) logger implementation
> SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further 
> details.
> Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 0.076 sec <<< 
> FAILURE!
> testThings(com.cantgetthistowork.InMemWindowProcessorTest)  Time elapsed: 
> 0.05 sec  <<< ERROR!
> java.lang.ClassCastException: 
> org.apache.kafka.streams.processor.MockProcessorContext cannot be cast to 
> org.apache.kafka.streams.processor.internals.InternalProcessorContext
>     at 
> org.apache.kafka.streams.state.internals.InMemoryWindowStore.init(InMemoryWindowStore.java:91)
>     at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
>     at 
> org.apache.kafka.streams.state.internals.MeteredWindowStore.init(MeteredWindowStore.java:90)
>     at 
> com.cantgetthistowork.InMemWindowProcessorTest.setup(InMemWindowProcessorTest.java:36)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)
>     at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>     at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>     at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>     at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:24)
>     at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>     at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>     at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>     at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>     at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>     at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>     at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>     at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>     at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>     at 
> org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:252)
>     at 
> org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:141)
>     at 
> org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:112)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)
>     at 
> org.apache.maven.surefire.util.ReflectionUtils.invokeMethodWithArray(ReflectionUtils.java:189)
>     at 
> org.apache.maven.surefire.booter.ProviderFactory$ProviderProxy.invoke(ProviderFactory.java:165)
>     at 
> org.apache.maven.surefire.booter.ProviderFactory.invokeProvider(ProviderFactory.java:85)
>     at 
> org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:115)
>     at 
> org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:75)
> Results :
> Tests in error: 
>   testThings(com.cantgetthistowork.InMemWindowProcessorTest): 
> org.apache.kafka.streams.processor.MockProcessorContext cannot be cast to 
> org.apache.kafka.streams.processor.internals.InternalProcessorContext
> Tests run: 1, Failures: 0, Errors: 1, Skipped: 0{noformat}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to