My stream app produces streams by subscribing to changes from our database by
using confluent connect, does some calculation and then publishes their own
stream/topic.
When starting the app, i attempt to get each of the stream store the app
publishes. This code simply tries to get the store using KafkaStreams.store
method in a try/catch loop (i try for 300 times with s sleep in between calls
to give the the stream time in case it is rebalancing or truly migrating). This
all worked fine for kafka 0.10.2
After upgrading to kafka 1.1.0, the app starts the first time fine. However, if
i try to restart the app, in cases where the stream consumes multiple topics
from connect, such streams are always throwing
InvalidStateStoreException. This does not happen for streams that subscribe to
a single connect topic. To fix, i must delete the logs and store, then
restarting my stream app.
i debugged into the source a bit and found the issue is this call in
org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider
public <T> List<T> stores(final String storeName, final
QueryableStoreType<T>
queryableStoreType) {
if (streamThread.state() == StreamThread.State.DEAD) {
return Collections.emptyList();
}
if (!streamThread.isRunningAndNotRebalancing()) {
throw new InvalidStateStoreException("the state store, " + storeName
+ ", may have migrated to another instance.");
}
final List<T> stores = new ArrayList<>();
for (Task streamTask : streamThread.tasks().values()) {
final StateStore store = streamTask.getStore(storeName);
if (store != null && queryableStoreType.accepts(store)) {
if (!store.isOpen()) {
throw new InvalidStateStoreException("the state store, " +
storeName
+ ", may have migrated to another instance.");
}
stores.add((T) store);
}
}
return stores;
}
For streams that consume multiple connect topics and produce a single
stream/topic, when i restart the app, the above code is not finding the store
for the topic it is supposed to publish (even though
it has to exist given the app starts and works fine the first time i start it
after clearing the logs and store. What is even more strange however, is that
despite it not finding a store, it is still receiving connect
produced topics and producing the calculated stream apparently just fine.
Anyone have any ideas on what might be happening here after the upgrade?