vvcephei commented on a change in pull request #11406: URL: https://github.com/apache/kafka/pull/11406#discussion_r734696949
########## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ########## @@ -1715,4 +1723,93 @@ protected void processStreamThread(final Consumer<StreamThread> consumer) { return Collections.unmodifiableMap(localStorePartitionLags); } + + public <K, V> StateSerdes<K, V> serdesForStore(final String storeName) { + if (!topologyMetadata.hasStore(storeName)) { + throw new UnknownStateStoreException( + "Cannot get state store " + storeName + " because no such store is registered in the topology." + ); + } + + // TODO this is a hack. We ought to be able to create the serdes independent of the + // TODO stores and cache them in the topology. + final Map<String, StateStore> globalStateStores = topologyMetadata.globalStateStores(); + if (globalStateStores.containsKey(storeName)) { + final StateStore store = globalStateStores.get(storeName); + return getSerdes(store); + } else { + for (final StreamThread thread : threads) { + final Map<TaskId, Task> tasks = thread.allTasks(); + for (final Entry<TaskId, Task> entry : tasks.entrySet()) { + final StateStore store = entry.getValue().getStore(storeName); + if (store != null) { + return getSerdes(store); + } + } + } + } + // there may be no local copy of this store. + // This is the main reason I want to decouble serde + // creation from the store itself. + return null; + } + + @SuppressWarnings("unchecked") + private <V, K> StateSerdes<K, V> getSerdes(final StateStore store) { + if (store instanceof MeteredKeyValueStore) { + return ((MeteredKeyValueStore<K, V>) store).serdes(); + } else if (store instanceof MeteredSessionStore) { + return ((MeteredSessionStore<K, V>) store).serdes(); + } else if (store instanceof MeteredWindowStore) { + return ((MeteredWindowStore<K, V>) store).serdes(); + } else { + throw new IllegalArgumentException("Unknown store type: " + store); + } + } + + public <R> InteractiveQueryResult<R> query(final InteractiveQueryRequest<R> request) { + final String storeName = request.getStoreName(); + if (!topologyMetadata.hasStore(storeName)) { + throw new UnknownStateStoreException( + "Cannot get state store " + storeName + " because no such store is registered in the topology." + ); + } + final InteractiveQueryResult<R> result = new InteractiveQueryResult<>(); + + final Map<String, StateStore> globalStateStores = topologyMetadata.globalStateStores(); Review comment: Global stores are a totally different beast, actually. They are the state for GlobalKTables, which maintains a separate full copy of the table on every KafkaStreams instance. They're handled differently as well, which is what my todo is about below. Whereas "regular" state stores are always just 1:1 partition to StateStore instance, global stores hold every partition in one store: the global task subscribes to every partition of its input topic and just dumps all the data into one store. The point right here in the logic is just that we're trying to figure out whether the specified storeName refers to a global store or a "regular" one, because the path to find that store is different. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org