[ 
https://issues.apache.org/jira/browse/KAFKA-7536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16693853#comment-16693853
 ] 

ASF GitHub Bot commented on KAFKA-7536:
---------------------------------------

guozhangwang closed pull request #5923: KAFKA-7536: Initialize 
TopologyTestDriver with non-null topic
URL: https://github.com/apache/kafka/pull/5923
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
index 0753b2a8e96..af8b073092b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
@@ -33,7 +33,7 @@
 
 public abstract class AbstractProcessorContext implements 
InternalProcessorContext {
 
-    static final String NONEXIST_TOPIC = "__null_topic__";
+    public static final String NONEXIST_TOPIC = "__null_topic__";
     private final TaskId taskId;
     private final String applicationId;
     private final StreamsConfig config;
diff --git 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index 2abfd6354be..a11ae6b8df0 100644
--- 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -325,7 +325,7 @@ public void onRestoreEnd(final TopicPartition 
topicPartition, final String store
                 new LogContext()
             );
             globalStateTask.initialize();
-            globalProcessorContext.setRecordContext(new 
ProcessorRecordContext(0L, -1L, -1, null, new RecordHeaders()));
+            globalProcessorContext.setRecordContext(new 
ProcessorRecordContext(0L, -1L, -1, ProcessorContextImpl.NONEXIST_TOPIC, new 
RecordHeaders()));
         } else {
             globalStateManager = null;
             globalStateTask = null;
@@ -352,7 +352,7 @@ public void onRestoreEnd(final TopicPartition 
topicPartition, final String store
             task.initializeStateStores();
             task.initializeTopology();
             context = (InternalProcessorContext) task.context();
-            context.setRecordContext(new ProcessorRecordContext(0L, -1L, -1, 
null, new RecordHeaders()));
+            context.setRecordContext(new ProcessorRecordContext(0L, -1L, -1, 
ProcessorContextImpl.NONEXIST_TOPIC, new RecordHeaders()));
         } else {
             task = null;
             context = null;
diff --git 
a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
 
b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
index bead079ce8d..3e95c731d9e 100644
--- 
a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
+++ 
b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
@@ -889,6 +889,23 @@ private void flushStore() {
         public void close() {}
     }
 
+    @Test
+    public void shouldAllowPrePopulatingStatesStoresWithCachingEnabled() {
+        final Topology topology = new Topology();
+        topology.addSource("sourceProcessor", "input-topic");
+        topology.addProcessor("aggregator", new CustomMaxAggregatorSupplier(), 
"sourceProcessor");
+        topology.addStateStore(Stores.keyValueStoreBuilder(
+            Stores.inMemoryKeyValueStore("aggStore"),
+            Serdes.String(),
+            Serdes.Long()).withCachingEnabled(), // intentionally turn on 
caching to achieve better test coverage
+            "aggregator");
+
+        testDriver = new TopologyTestDriver(topology, config);
+
+        store = testDriver.getKeyValueStore("aggStore");
+        store.put("a", 21L);
+    }
+
     @Test
     public void shouldCleanUpPersistentStateStoresOnClose() {
         final Topology topology = new Topology();


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> TopologyTestDriver cannot pre-populate KTable or GlobalKTable
> -------------------------------------------------------------
>
>                 Key: KAFKA-7536
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7536
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.0.0
>            Reporter: Dmitry Minkovsky
>            Priority: Minor
>
> I have a GlobalKTable that's defined as
> {code}
> GlobalKTable<String, ByteString> userIdsByEmail = topology          
>    .globalTable(USER_IDS_BY_EMAIL.name,
>                        USER_IDS_BY_EMAIL.consumed(),
>                        Materialized.as("user-ids-by-email"));
> {code}
> And the following test in Spock:
> {code}
>     def topology = // my topology
>     def driver = new TopologyTestDriver(topology, config())
>     def cleanup() {
>         driver.close()
>     }
>     def "create from email request"() {
>         def store = driver.getKeyValueStore('user-ids-by-email')
>         store.put('string', ByteString.copyFrom(new byte[0]))
>         // more, but it fails at the `put` above
> {code}
> When I run this, I get the following:
> {code}
> [2018-10-23 19:35:27,055] INFO 
> (org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl) mock 
> Restoring state for global store user-ids-by-email
> java.lang.NullPointerException
>       at 
> org.apache.kafka.streams.processor.internals.AbstractProcessorContext.topic(AbstractProcessorContext.java:115)
>       at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.putInternal(CachingKeyValueStore.java:237)
>       at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:220)
>       at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:38)
>       at 
> org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.put(InnerMeteredKeyValueStore.java:206)
>       at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.put(MeteredKeyValueBytesStore.java:117)
>       at pony.message.MessageWriteStreamsTest.create from mailgun email 
> request(MessageWriteStreamsTest.groovy:52)
> [2018-10-23 19:35:27,189] INFO 
> (org.apache.kafka.streams.processor.internals.StateDirectory) stream-thread 
> [main] Deleting state directory 0_0 for task 0_0 as user calling cleanup.
> {code}
> The same issue applies to KTable.
> I've noticed that I can {{put()}} to the store if I first write to it with 
> {{driver.pipeInput}}. But otherwise I get the above error.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to