My stream app produces streams by subscribing to changes from our database by 
using confluent connect, does some calculation and then publishes their own 
stream/topic.

When starting the app, i attempt to get each of the stream store the app 
publishes. This code simply tries to get the store using KafkaStreams.store 
method in a try/catch loop (i try for 300 times with s sleep in between calls  
to give the the stream time in case it is rebalancing or truly migrating). This 
all worked fine for kafka 0.10.2

After upgrading to kafka 1.1.0, the app starts the first time fine. However, if 
i try to restart the app, in cases where the stream consumes multiple topics 
from connect, such streams are always throwing
 InvalidStateStoreException. This does not happen for streams that subscribe to 
a single connect topic. To fix, i must delete the logs and store, then 
restarting my stream app.

i debugged into the source a bit and found the issue is this call in 
org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider
    public <T> List<T> stores(final String storeName, final 
QueryableStoreType<T>
 queryableStoreType) {
    if (streamThread.state() == StreamThread.State.DEAD) {
        return Collections.emptyList();
    }
    if (!streamThread.isRunningAndNotRebalancing()) {
        throw new InvalidStateStoreException("the state store, " + storeName
 + ", may have migrated to another instance.");
    }
    final List<T> stores = new ArrayList<>();
    for (Task streamTask : streamThread.tasks().values()) {
        final StateStore store = streamTask.getStore(storeName);
        if (store != null && queryableStoreType.accepts(store)) {
            if (!store.isOpen()) {
                throw new InvalidStateStoreException("the state store, " + 
storeName
 + ", may have migrated to another instance.");
            }
            stores.add((T) store);
        }
    }
    return stores;
}

For streams that consume multiple connect topics and produce a single 
stream/topic, when i restart the app, the above code is not finding the store 
for the topic it is supposed to publish (even though
 it has to exist given the app starts and works fine the first time i start it 
after clearing the logs and store. What is even more strange however, is that 
despite it not finding a store, it is still receiving connect
 produced topics and producing the calculated stream apparently just fine.

Anyone have any ideas on what might be happening here after the upgrade?

Reply via email to