mjsax commented on a change in pull request #8902:
URL: https://github.com/apache/kafka/pull/8902#discussion_r444541033



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##########
@@ -402,4 +402,9 @@ public TaskType taskType() {
     public Map<TopicPartition, Long> changelogOffsets() {
         return Collections.unmodifiableMap(checkpointFileCache);
     }
+
+    @Override
+    public TopicPartition changelogTopicPartitionFor(final String storeName) {
+        return null;

Review comment:
       Should we better throw an exception as this method should never be 
called for global state stores?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
##########
@@ -118,16 +111,24 @@ public void logChange(final String storeName,
                           final byte[] value,
                           final long timestamp) {
         throwUnsupportedOperationExceptionIfStandby("logChange");
+
+        final TopicPartition changelogPartition = 
stateManager().changelogTopicPartitionFor(storeName);
+        if (changelogPartition == null) {
+            throw new IllegalStateException("Sending records to state store " 
+ storeName +
+                " which has not been registered.");

Review comment:
       I thinks the store could still be registered, but just does not have a 
changelog topic? Wondering if the error message might be miss leading?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
##########
@@ -578,4 +577,10 @@ private StateStoreMetadata findStore(final TopicPartition 
changelogPartition) {
 
         return found.isEmpty() ? null : found.get(0);
     }
+
+    @Override
+    public TopicPartition changelogTopicPartitionFor(final String storeName) {
+        final StateStoreMetadata storeMetadata = stores.get(storeName);
+        return storeMetadata == null ? null : storeMetadata.changelogPartition;

Review comment:
       Cf. my comment from above: we could raise the exception if 
`storeMetadata == null`

##########
File path: 
streams/src/test/java/org/apache/kafka/test/GlobalStateManagerStub.java
##########
@@ -88,4 +88,14 @@ public StateStore getGlobalStore(final String name) {
     public TaskType taskType() {
         return TaskType.GLOBAL;
     }
+
+    @Override
+    public String changelogFor(final String storeName) {
+        return null;
+    }
+
+    @Override
+    public TopicPartition changelogTopicPartitionFor(final String storeName) {
+        return null;

Review comment:
       As above: should we throw here?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to