[jira] [Commented] (KAFKA-8630) Unit testing a streams processor with a WindowStore throws a ClassCastException

2019-07-08 Thread Justin Fetherolf (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16880923#comment-16880923
 ] 

Justin Fetherolf commented on KAFKA-8630:
-

The `ProcessorContext` interface already offers a `metrics()` method, and 
`InternalProcessorContext` for some reason overrides it to return a 
`StreamsMetricsImpl` rather than `StreamsMetrics`.

The bit of poking around in the code I've done this evening it seems to be kind 
of a mixed bag of stores needing just this `metrics()` method (and thus not 
needing to do the cast) and needing the other methods that 
`InternalProcessorContext` offers.

Is the possibly pre-existing KIP you're thinking of 
[KIP-448|https://cwiki.apache.org/confluence/x/SAeZBg]?

> Unit testing a streams processor with a WindowStore throws a 
> ClassCastException
> ---
>
> Key: KAFKA-8630
> URL: https://issues.apache.org/jira/browse/KAFKA-8630
> Project: Kafka
>  Issue Type: Bug
>  Components: streams-test-utils
>Affects Versions: 2.3.0
>Reporter: Justin Fetherolf
>Priority: Major
>
> I was attempting to write a unit test for a class implementing the 
> {{Processor}} interface that contained a {{WindowStore}}, but running the 
> test fails with a {{ClassCastException}} coming out of 
> {{InMemoryWindowStore.init}} attempting to cast {{MockProcessorContext}} to 
> {{InternalProcessorContext}}.
> Minimal code to reproduce:
> {code:java}
> package com.cantgetthistowork;
> import org.apache.kafka.streams.processor.Processor;
> import org.apache.kafka.streams.processor.ProcessorContext;
> import org.apache.kafka.streams.state.WindowStore;
> public class InMemWindowProcessor implements Processor {
>   private ProcessorContext context;
>   private WindowStore windowStore;
>   @Override
>   public void init(ProcessorContext context) {
> this.context = context;
> windowStore = (WindowStore) 
> context.getStateStore("my-win-store");
>   }
>   @Override
>   public void process(String key, String value) {
>   }
>   @Override
>   public void close() {
>   }
> }
> {code}
> {code:java}
> package com.cantgetthistowork;
> import java.time.Duration;
> import java.time.Instant;
> import org.apache.kafka.common.serialization.Serdes;
> import org.apache.kafka.streams.processor.MockProcessorContext;
> import org.apache.kafka.streams.state.Stores;
> import org.apache.kafka.streams.state.WindowStore;
> import org.junit.Before;
> import org.junit.Test;
> public class InMemWindowProcessorTest {
>   InMemWindowProcessor processor = null;
>   MockProcessorContext context = null;
>   @Before
>   public void setup() {
> processor = new InMemWindowProcessor();
> context = new MockProcessorContext();
> WindowStore store =
>   Stores.windowStoreBuilder(
> Stores.inMemoryWindowStore(
>   "my-win-store",
>   Duration.ofMinutes(10),
>   Duration.ofSeconds(10),
>   false
> ),
> Serdes.String(),
> Serdes.String()
>   )
>   .withLoggingDisabled()
>   .build();
> store.init(context, store);
> context.register(store, null);
> processor.init(context);
>   }
>   @Test
>   public void testThings() {
> Instant baseTime = Instant.now();
> context.setTimestamp(baseTime.toEpochMilli());
> context.setTopic("topic-name");
> processor.process("key1", "value1");
>   }
> }
> {code}
>  
> I was trying this with maven, with mvn --version outputting:
> {noformat}
> Apache Maven 3.5.0 (ff8f5e7444045639af65f6095c62210b5713f426; 
> 2017-04-03T13:39:06-06:00)
> Maven home: ~/opt/apache-maven-3.5.0
> Java version: 1.8.0_212, vendor: Oracle Corporation
> Java home: /usr/lib/jvm/java-8-openjdk-amd64/jre
> Default locale: en_US, platform encoding: UTF-8
> OS name: "linux", version: "4.15.0-52-generic", arch: "amd64", family: 
> "unix"{noformat}
> And finally the stack trace:
> {noformat}
> ---
>  T E S T S
> ---
> Running com.cantgetthistowork.InMemWindowProcessorTest
> SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
> SLF4J: Defaulting to no-operation (NOP) logger implementation
> SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further 
> details.
> Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 0.076 sec <<< 
> FAILURE!
> testThings(com.cantgetthistowork.InMemWindowProcessorTest)  Time elapsed: 
> 0.05 sec  <<< ERROR!
> java.lang.ClassCastException: 
> org.apache.kafka.streams.processor.MockProcessorContext cannot be cast to 
> org.apache.kafka.streams.processor.internals.InternalProcessorContext
> at 
> org.apache.kafka.streams.state.internals.InMemoryWindowStore.init(InMemoryWindowStore.java:91)
> at 
> 

[jira] [Commented] (KAFKA-8630) Unit testing a streams processor with a WindowStore throws a ClassCastException

2019-07-08 Thread Justin Fetherolf (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16880831#comment-16880831
 ] 

Justin Fetherolf commented on KAFKA-8630:
-

[~ableegoldman] I personally feel that this is still a problem and shouldn't be 
closed.

Trying to find `InternalMockProcessorContext` revealed another class 
`MockInternalProcessorContext`, both of which are in test code and not 
available via the public packages (or at least not in `kafka-streams` or 
`kafka-streams-test-utils` which seems like the logical place for them).

I think the root problem here is not having necessary mock contexts in the 
proper package, and not having the documentation to support their use cases.

> Unit testing a streams processor with a WindowStore throws a 
> ClassCastException
> ---
>
> Key: KAFKA-8630
> URL: https://issues.apache.org/jira/browse/KAFKA-8630
> Project: Kafka
>  Issue Type: Bug
>  Components: streams-test-utils
>Affects Versions: 2.3.0
>Reporter: Justin Fetherolf
>Priority: Major
>
> I was attempting to write a unit test for a class implementing the 
> {{Processor}} interface that contained a {{WindowStore}}, but running the 
> test fails with a {{ClassCastException}} coming out of 
> {{InMemoryWindowStore.init}} attempting to cast {{MockProcessorContext}} to 
> {{InternalProcessorContext}}.
> Minimal code to reproduce:
> {code:java}
> package com.cantgetthistowork;
> import org.apache.kafka.streams.processor.Processor;
> import org.apache.kafka.streams.processor.ProcessorContext;
> import org.apache.kafka.streams.state.WindowStore;
> public class InMemWindowProcessor implements Processor {
>   private ProcessorContext context;
>   private WindowStore windowStore;
>   @Override
>   public void init(ProcessorContext context) {
> this.context = context;
> windowStore = (WindowStore) 
> context.getStateStore("my-win-store");
>   }
>   @Override
>   public void process(String key, String value) {
>   }
>   @Override
>   public void close() {
>   }
> }
> {code}
> {code:java}
> package com.cantgetthistowork;
> import java.time.Duration;
> import java.time.Instant;
> import org.apache.kafka.common.serialization.Serdes;
> import org.apache.kafka.streams.processor.MockProcessorContext;
> import org.apache.kafka.streams.state.Stores;
> import org.apache.kafka.streams.state.WindowStore;
> import org.junit.Before;
> import org.junit.Test;
> public class InMemWindowProcessorTest {
>   InMemWindowProcessor processor = null;
>   MockProcessorContext context = null;
>   @Before
>   public void setup() {
> processor = new InMemWindowProcessor();
> context = new MockProcessorContext();
> WindowStore store =
>   Stores.windowStoreBuilder(
> Stores.inMemoryWindowStore(
>   "my-win-store",
>   Duration.ofMinutes(10),
>   Duration.ofSeconds(10),
>   false
> ),
> Serdes.String(),
> Serdes.String()
>   )
>   .withLoggingDisabled()
>   .build();
> store.init(context, store);
> context.register(store, null);
> processor.init(context);
>   }
>   @Test
>   public void testThings() {
> Instant baseTime = Instant.now();
> context.setTimestamp(baseTime.toEpochMilli());
> context.setTopic("topic-name");
> processor.process("key1", "value1");
>   }
> }
> {code}
>  
> I was trying this with maven, with mvn --version outputting:
> {noformat}
> Apache Maven 3.5.0 (ff8f5e7444045639af65f6095c62210b5713f426; 
> 2017-04-03T13:39:06-06:00)
> Maven home: ~/opt/apache-maven-3.5.0
> Java version: 1.8.0_212, vendor: Oracle Corporation
> Java home: /usr/lib/jvm/java-8-openjdk-amd64/jre
> Default locale: en_US, platform encoding: UTF-8
> OS name: "linux", version: "4.15.0-52-generic", arch: "amd64", family: 
> "unix"{noformat}
> And finally the stack trace:
> {noformat}
> ---
>  T E S T S
> ---
> Running com.cantgetthistowork.InMemWindowProcessorTest
> SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
> SLF4J: Defaulting to no-operation (NOP) logger implementation
> SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further 
> details.
> Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 0.076 sec <<< 
> FAILURE!
> testThings(com.cantgetthistowork.InMemWindowProcessorTest)  Time elapsed: 
> 0.05 sec  <<< ERROR!
> java.lang.ClassCastException: 
> org.apache.kafka.streams.processor.MockProcessorContext cannot be cast to 
> org.apache.kafka.streams.processor.internals.InternalProcessorContext
> at 
> org.apache.kafka.streams.state.internals.InMemoryWindowStore.init(InMemoryWindowStore.java:91)
> at 
> 

[jira] [Updated] (KAFKA-8630) Unit testing a streams processor with a WindowStore throws a ClassCastException

2019-07-04 Thread Justin Fetherolf (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8630?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Justin Fetherolf updated KAFKA-8630:

Description: 
I was attempting to write a unit test for a class implementing the 
{{Processor}} interface that contained a {{WindowStore}}, but running the test 
fails with a {{ClassCastException}} coming out of {{InMemoryWindowStore.init}} 
attempting to cast {{MockProcessorContext}} to {{InternalProcessorContext}}.

Minimal code to reproduce:
{code:java}
package com.cantgetthistowork;

import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.WindowStore;

public class InMemWindowProcessor implements Processor {

  private ProcessorContext context;
  private WindowStore windowStore;

  @Override
  public void init(ProcessorContext context) {
this.context = context;

windowStore = (WindowStore) 
context.getStateStore("my-win-store");
  }

  @Override
  public void process(String key, String value) {
  }

  @Override
  public void close() {
  }

}
{code}
{code:java}
package com.cantgetthistowork;

import java.time.Duration;
import java.time.Instant;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.processor.MockProcessorContext;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.WindowStore;
import org.junit.Before;
import org.junit.Test;

public class InMemWindowProcessorTest {

  InMemWindowProcessor processor = null;
  MockProcessorContext context = null;

  @Before
  public void setup() {
processor = new InMemWindowProcessor();
context = new MockProcessorContext();

WindowStore store =
  Stores.windowStoreBuilder(
Stores.inMemoryWindowStore(
  "my-win-store",
  Duration.ofMinutes(10),
  Duration.ofSeconds(10),
  false
),
Serdes.String(),
Serdes.String()
  )
  .withLoggingDisabled()
  .build();
store.init(context, store);
context.register(store, null);
processor.init(context);
  }

  @Test
  public void testThings() {
Instant baseTime = Instant.now();
context.setTimestamp(baseTime.toEpochMilli());
context.setTopic("topic-name");
processor.process("key1", "value1");
  }

}
{code}
 

I was trying this with maven, with mvn --version outputting:
{noformat}
Apache Maven 3.5.0 (ff8f5e7444045639af65f6095c62210b5713f426; 
2017-04-03T13:39:06-06:00)
Maven home: ~/opt/apache-maven-3.5.0
Java version: 1.8.0_212, vendor: Oracle Corporation
Java home: /usr/lib/jvm/java-8-openjdk-amd64/jre
Default locale: en_US, platform encoding: UTF-8
OS name: "linux", version: "4.15.0-52-generic", arch: "amd64", family: 
"unix"{noformat}
And finally the stack trace:
{noformat}
---
 T E S T S
---
Running com.cantgetthistowork.InMemWindowProcessorTest
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further 
details.
Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 0.076 sec <<< 
FAILURE!
testThings(com.cantgetthistowork.InMemWindowProcessorTest)  Time elapsed: 0.05 
sec  <<< ERROR!
java.lang.ClassCastException: 
org.apache.kafka.streams.processor.MockProcessorContext cannot be cast to 
org.apache.kafka.streams.processor.internals.InternalProcessorContext
at 
org.apache.kafka.streams.state.internals.InMemoryWindowStore.init(InMemoryWindowStore.java:91)
at 
org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
at 
org.apache.kafka.streams.state.internals.MeteredWindowStore.init(MeteredWindowStore.java:90)
at 
com.cantgetthistowork.InMemWindowProcessorTest.setup(InMemWindowProcessorTest.java:36)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:24)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at 

[jira] [Created] (KAFKA-8630) Unit testing a streams processor with a WindowStore throws a ClassCastException

2019-07-04 Thread Justin Fetherolf (JIRA)
Justin Fetherolf created KAFKA-8630:
---

 Summary: Unit testing a streams processor with a WindowStore 
throws a ClassCastException
 Key: KAFKA-8630
 URL: https://issues.apache.org/jira/browse/KAFKA-8630
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 2.3.0
Reporter: Justin Fetherolf


I was attempting to write a unit test for a class implementing the 
{{Processor}} interface that contained a {{WindowStore}}, but running the test 
fails with a {{ClassCastException}} coming out of {{InMemoryWindowStore.init}} 
attempting to cast {{MockProcessorContext}} to {{InternalProcessorContext}}.

Minimal code to reproduce:

 

{{package com.cantgetthistowork;

import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.WindowStore;

public class InMemWindowProcessor implements Processor \{

  private ProcessorContext context;
  private WindowStore windowStore;

  @Override
  public void init(ProcessorContext context) {
this.context = context;

windowStore = (WindowStore) 
context.getStateStore("my-win-store");
  }

  @Override
  public void process(String key, String value) \{
  }

  @Override
  public void close() \{
  }

}}}

 

 
{code:java}
package com.cantgetthistowork;

import java.time.Duration;
import java.time.Instant;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.processor.MockProcessorContext;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.WindowStore;
import org.junit.Before;
import org.junit.Test;

public class InMemWindowProcessorTest {

  InMemWindowProcessor processor = null;
  MockProcessorContext context = null;

  @Before
  public void setup() {
processor = new InMemWindowProcessor();
context = new MockProcessorContext();

WindowStore store =
  Stores.windowStoreBuilder(
Stores.inMemoryWindowStore(
  "my-win-store",
  Duration.ofMinutes(10),
  Duration.ofSeconds(10),
  false
),
Serdes.String(),
Serdes.String()
  )
  .withLoggingDisabled()
  .build();
store.init(context, store);
context.register(store, null);
processor.init(context);
  }

  @Test
  public void testThings() {
Instant baseTime = Instant.now();
context.setTimestamp(baseTime.toEpochMilli());
context.setTopic("topic-name");
processor.process("key1", "value1");
  }

}
{code}
 

I was trying this with maven, with mvn --version outputting:
{noformat}
Apache Maven 3.5.0 (ff8f5e7444045639af65f6095c62210b5713f426; 
2017-04-03T13:39:06-06:00)
Maven home: ~/opt/apache-maven-3.5.0
Java version: 1.8.0_212, vendor: Oracle Corporation
Java home: /usr/lib/jvm/java-8-openjdk-amd64/jre
Default locale: en_US, platform encoding: UTF-8
OS name: "linux", version: "4.15.0-52-generic", arch: "amd64", family: 
"unix"{noformat}
And finally the stack trace:
{noformat}
---
 T E S T S
---
Running com.cantgetthistowork.InMemWindowProcessorTest
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further 
details.
Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 0.076 sec <<< 
FAILURE!
testThings(com.cantgetthistowork.InMemWindowProcessorTest)  Time elapsed: 0.05 
sec  <<< ERROR!
java.lang.ClassCastException: 
org.apache.kafka.streams.processor.MockProcessorContext cannot be cast to 
org.apache.kafka.streams.processor.internals.InternalProcessorContext
at 
org.apache.kafka.streams.state.internals.InMemoryWindowStore.init(InMemoryWindowStore.java:91)
at 
org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
at 
org.apache.kafka.streams.state.internals.MeteredWindowStore.init(MeteredWindowStore.java:90)
at 
com.cantgetthistowork.InMemWindowProcessorTest.setup(InMemWindowProcessorTest.java:36)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:24)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at