vvcephei commented on a change in pull request #11406:
URL: https://github.com/apache/kafka/pull/11406#discussion_r731304471



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1715,4 +1723,93 @@ protected void processStreamThread(final 
Consumer<StreamThread> consumer) {
 
         return Collections.unmodifiableMap(localStorePartitionLags);
     }
+
+    public <K, V> StateSerdes<K, V> serdesForStore(final String storeName) {
+        if (!topologyMetadata.hasStore(storeName)) {
+            throw new UnknownStateStoreException(
+                "Cannot get state store " + storeName + " because no such 
store is registered in the topology."
+            );
+        }
+
+        // TODO this is a hack. We ought to be able to create the serdes 
independent of the
+        // TODO stores and cache them in the topology.
+        final Map<String, StateStore> globalStateStores = 
topologyMetadata.globalStateStores();
+        if (globalStateStores.containsKey(storeName)) {
+            final StateStore store = globalStateStores.get(storeName);
+            return getSerdes(store);
+        } else {
+            for (final StreamThread thread : threads) {
+                final Map<TaskId, Task> tasks = thread.allTasks();
+                for (final Entry<TaskId, Task> entry : tasks.entrySet()) {
+                    final StateStore store = 
entry.getValue().getStore(storeName);
+                    if (store != null) {
+                        return getSerdes(store);
+                    }
+                }
+            }
+        }
+        // there may be no local copy of this store.
+        // This is the main reason I want to decouble serde
+        // creation from the store itself.
+        return null;
+    }
+
+    @SuppressWarnings("unchecked")
+    private <V, K> StateSerdes<K, V> getSerdes(final StateStore store) {
+        if (store instanceof MeteredKeyValueStore) {
+            return ((MeteredKeyValueStore<K, V>) store).serdes();
+        } else if (store instanceof MeteredSessionStore) {
+            return ((MeteredSessionStore<K, V>) store).serdes();
+        } else if (store instanceof MeteredWindowStore) {
+            return ((MeteredWindowStore<K, V>) store).serdes();
+        } else {
+            throw new IllegalArgumentException("Unknown store type: " + store);
+        }
+    }
+
+    public <R> InteractiveQueryResult<R> query(final 
InteractiveQueryRequest<R> request) {
+        final String storeName = request.getStoreName();
+        if (!topologyMetadata.hasStore(storeName)) {
+            throw new UnknownStateStoreException(
+                "Cannot get state store " + storeName + " because no such 
store is registered in the topology."
+            );
+        }
+        final InteractiveQueryResult<R> result = new 
InteractiveQueryResult<>();
+
+        final Map<String, StateStore> globalStateStores = 
topologyMetadata.globalStateStores();
+        if (globalStateStores.containsKey(storeName)) {
+            final StateStore store = globalStateStores.get(storeName);
+            // TODO: a global state store might need to be a different 
interface to accommodate
+            // TODO: multiple partitions
+            final QueryResult<R> r = store.query(request.getQuery(), 
Collections.emptyMap());
+            result.setGlobalResult(r);
+        } else {
+            for (final StreamThread thread : threads) {
+                final Map<TaskId, Task> tasks = thread.allTasks();
+                for (final Entry<TaskId, Task> entry : tasks.entrySet()) {
+                    final Map<String, Map<Integer, Long>> 
partitionsAndOffsetBounds =

Review comment:
       Yes, this is just the POC. I'd like to have the stores registered by 
name and partition in the KafkaStreams object so we don't have to do these 
loops. This is a big source of inefficiency in IQ today.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to