vvcephei commented on a change in pull request #11406: URL: https://github.com/apache/kafka/pull/11406#discussion_r731304471
########## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ########## @@ -1715,4 +1723,93 @@ protected void processStreamThread(final Consumer<StreamThread> consumer) { return Collections.unmodifiableMap(localStorePartitionLags); } + + public <K, V> StateSerdes<K, V> serdesForStore(final String storeName) { + if (!topologyMetadata.hasStore(storeName)) { + throw new UnknownStateStoreException( + "Cannot get state store " + storeName + " because no such store is registered in the topology." + ); + } + + // TODO this is a hack. We ought to be able to create the serdes independent of the + // TODO stores and cache them in the topology. + final Map<String, StateStore> globalStateStores = topologyMetadata.globalStateStores(); + if (globalStateStores.containsKey(storeName)) { + final StateStore store = globalStateStores.get(storeName); + return getSerdes(store); + } else { + for (final StreamThread thread : threads) { + final Map<TaskId, Task> tasks = thread.allTasks(); + for (final Entry<TaskId, Task> entry : tasks.entrySet()) { + final StateStore store = entry.getValue().getStore(storeName); + if (store != null) { + return getSerdes(store); + } + } + } + } + // there may be no local copy of this store. + // This is the main reason I want to decouble serde + // creation from the store itself. + return null; + } + + @SuppressWarnings("unchecked") + private <V, K> StateSerdes<K, V> getSerdes(final StateStore store) { + if (store instanceof MeteredKeyValueStore) { + return ((MeteredKeyValueStore<K, V>) store).serdes(); + } else if (store instanceof MeteredSessionStore) { + return ((MeteredSessionStore<K, V>) store).serdes(); + } else if (store instanceof MeteredWindowStore) { + return ((MeteredWindowStore<K, V>) store).serdes(); + } else { + throw new IllegalArgumentException("Unknown store type: " + store); + } + } + + public <R> InteractiveQueryResult<R> query(final InteractiveQueryRequest<R> request) { + final String storeName = request.getStoreName(); + if (!topologyMetadata.hasStore(storeName)) { + throw new UnknownStateStoreException( + "Cannot get state store " + storeName + " because no such store is registered in the topology." + ); + } + final InteractiveQueryResult<R> result = new InteractiveQueryResult<>(); + + final Map<String, StateStore> globalStateStores = topologyMetadata.globalStateStores(); + if (globalStateStores.containsKey(storeName)) { + final StateStore store = globalStateStores.get(storeName); + // TODO: a global state store might need to be a different interface to accommodate + // TODO: multiple partitions + final QueryResult<R> r = store.query(request.getQuery(), Collections.emptyMap()); + result.setGlobalResult(r); + } else { + for (final StreamThread thread : threads) { + final Map<TaskId, Task> tasks = thread.allTasks(); + for (final Entry<TaskId, Task> entry : tasks.entrySet()) { + final Map<String, Map<Integer, Long>> partitionsAndOffsetBounds = Review comment: Yes, this is just the POC. I'd like to have the stores registered by name and partition in the KafkaStreams object so we don't have to do these loops. This is a big source of inefficiency in IQ today. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org