[ 
https://issues.apache.org/jira/browse/KAFKA-7536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16662413#comment-16662413
 ] 

Dmitry Minkovsky commented on KAFKA-7536:
-----------------------------------------

Thank you.

Yes, disabling cache makes the issue go away.

I tested, and this affects regular KTable as well. Disabling cache makes that 
go away too. 

Not sure how I will work around this, either by abstracting store creation to 
make disabling cache easy for tests, or just by piping input. Looks like it 
will depend on the situation.

> TopologyTestDriver cannot pre-populate KTable
> ---------------------------------------------
>
>                 Key: KAFKA-7536
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7536
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.0.0
>            Reporter: Dmitry Minkovsky
>            Priority: Minor
>
> I have a GlobalKTable that's defined as
> {code}
> GlobalKTable<String, ByteString> userIdsByEmail = topology          
>    .globalTable(USER_IDS_BY_EMAIL.name,
>                        USER_IDS_BY_EMAIL.consumed(),
>                        Materialized.as("user-ids-by-email"));
> {code}
> And the following test in Spock:
> {code}
>     def topology = // my topology
>     def driver = new TopologyTestDriver(topology, config())
>     def cleanup() {
>         driver.close()
>     }
>     def "create from email request"() {
>         def store = driver.getKeyValueStore('user-ids-by-email')
>         store.put('string', ByteString.copyFrom(new byte[0]))
>         // more, but it fails at the `put` above
> {code}
> When I run this, I get the following:
> {code}
> [2018-10-23 19:35:27,055] INFO 
> (org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl) mock 
> Restoring state for global store user-ids-by-email
> java.lang.NullPointerException
>       at 
> org.apache.kafka.streams.processor.internals.AbstractProcessorContext.topic(AbstractProcessorContext.java:115)
>       at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.putInternal(CachingKeyValueStore.java:237)
>       at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:220)
>       at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:38)
>       at 
> org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.put(InnerMeteredKeyValueStore.java:206)
>       at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.put(MeteredKeyValueBytesStore.java:117)
>       at pony.message.MessageWriteStreamsTest.create from mailgun email 
> request(MessageWriteStreamsTest.groovy:52)
> [2018-10-23 19:35:27,189] INFO 
> (org.apache.kafka.streams.processor.internals.StateDirectory) stream-thread 
> [main] Deleting state directory 0_0 for task 0_0 as user calling cleanup.
> {code}
> I've noticed that I can {{put()}} to the store if I first write to it with 
> {{driver.pipeInput}}. But otherwise I get the above error.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to