vvcephei commented on a change in pull request #11406:
URL: https://github.com/apache/kafka/pull/11406#discussion_r734696949



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1715,4 +1723,93 @@ protected void processStreamThread(final 
Consumer<StreamThread> consumer) {
 
         return Collections.unmodifiableMap(localStorePartitionLags);
     }
+
+    public <K, V> StateSerdes<K, V> serdesForStore(final String storeName) {
+        if (!topologyMetadata.hasStore(storeName)) {
+            throw new UnknownStateStoreException(
+                "Cannot get state store " + storeName + " because no such 
store is registered in the topology."
+            );
+        }
+
+        // TODO this is a hack. We ought to be able to create the serdes 
independent of the
+        // TODO stores and cache them in the topology.
+        final Map<String, StateStore> globalStateStores = 
topologyMetadata.globalStateStores();
+        if (globalStateStores.containsKey(storeName)) {
+            final StateStore store = globalStateStores.get(storeName);
+            return getSerdes(store);
+        } else {
+            for (final StreamThread thread : threads) {
+                final Map<TaskId, Task> tasks = thread.allTasks();
+                for (final Entry<TaskId, Task> entry : tasks.entrySet()) {
+                    final StateStore store = 
entry.getValue().getStore(storeName);
+                    if (store != null) {
+                        return getSerdes(store);
+                    }
+                }
+            }
+        }
+        // there may be no local copy of this store.
+        // This is the main reason I want to decouble serde
+        // creation from the store itself.
+        return null;
+    }
+
+    @SuppressWarnings("unchecked")
+    private <V, K> StateSerdes<K, V> getSerdes(final StateStore store) {
+        if (store instanceof MeteredKeyValueStore) {
+            return ((MeteredKeyValueStore<K, V>) store).serdes();
+        } else if (store instanceof MeteredSessionStore) {
+            return ((MeteredSessionStore<K, V>) store).serdes();
+        } else if (store instanceof MeteredWindowStore) {
+            return ((MeteredWindowStore<K, V>) store).serdes();
+        } else {
+            throw new IllegalArgumentException("Unknown store type: " + store);
+        }
+    }
+
+    public <R> InteractiveQueryResult<R> query(final 
InteractiveQueryRequest<R> request) {
+        final String storeName = request.getStoreName();
+        if (!topologyMetadata.hasStore(storeName)) {
+            throw new UnknownStateStoreException(
+                "Cannot get state store " + storeName + " because no such 
store is registered in the topology."
+            );
+        }
+        final InteractiveQueryResult<R> result = new 
InteractiveQueryResult<>();
+
+        final Map<String, StateStore> globalStateStores = 
topologyMetadata.globalStateStores();

Review comment:
       Global stores are a totally different beast, actually. They are the 
state for GlobalKTables, which maintains a separate full copy of the table on 
every KafkaStreams instance. They're handled differently as well, which is what 
my todo is about below. Whereas "regular" state stores are always just 1:1 
partition to StateStore instance, global stores hold every partition in one 
store: the global task subscribes to every partition of its input topic and 
just dumps all the data into one store.
   
   The point right here in the logic is just that we're trying to figure out 
whether the specified storeName refers to a global store or a "regular" one, 
because the path to find that store is different.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to