Hello,

Thanks for reporting this issue, did you know which line gets fired and
throw the InvalidStateStoreException since you listed two places here?

1)  if (!streamThread.isRunningAndNotRebalancing())

2) if (!store.isOpen())

>From the description that "the above code is not finding the store for the
topic it is supposed to publish (even though it has to exist given the app
starts and works fine the first time i start it after clearing the logs and
store." I'm not clear which scenario are you referring to.


Also could you paste the full stack trace of the exception so that I can
look into this issue further?


Guozhang


On Thu, Apr 26, 2018 at 7:29 AM, dizzy0ny <dizzy...@gmail.com> wrote:

> My stream app produces streams by subscribing to changes from our database
> by using confluent connect, does some calculation and then publishes their
> own stream/topic.
>
> When starting the app, i attempt to get each of the stream store the app
> publishes. This code simply tries to get the store using KafkaStreams.store
> method in a try/catch loop (i try for 300 times with s sleep in between
> calls  to give the the stream time in case it is rebalancing or truly
> migrating). This all worked fine for kafka 0.10.2
>
> After upgrading to kafka 1.1.0, the app starts the first time fine.
> However, if i try to restart the app, in cases where the stream consumes
> multiple topics from connect, such streams are always throwing
>  InvalidStateStoreException. This does not happen for streams that
> subscribe to a single connect topic. To fix, i must delete the logs and
> store, then restarting my stream app.
>
> i debugged into the source a bit and found the issue is this call in
> org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider
>     public <T> List<T> stores(final String storeName, final
> QueryableStoreType<T>
>  queryableStoreType) {
>     if (streamThread.state() == StreamThread.State.DEAD) {
>         return Collections.emptyList();
>     }
>     if (!streamThread.isRunningAndNotRebalancing()) {
>         throw new InvalidStateStoreException("the state store, " +
> storeName
>  + ", may have migrated to another instance.");
>     }
>     final List<T> stores = new ArrayList<>();
>     for (Task streamTask : streamThread.tasks().values()) {
>         final StateStore store = streamTask.getStore(storeName);
>         if (store != null && queryableStoreType.accepts(store)) {
>             if (!store.isOpen()) {
>                 throw new InvalidStateStoreException("the state store, "
> + storeName
>  + ", may have migrated to another instance.");
>             }
>             stores.add((T) store);
>         }
>     }
>     return stores;
> }
>
> For streams that consume multiple connect topics and produce a single
> stream/topic, when i restart the app, the above code is not finding the
> store for the topic it is supposed to publish (even though
>  it has to exist given the app starts and works fine the first time i
> start it after clearing the logs and store. What is even more strange
> however, is that despite it not finding a store, it is still receiving
> connect
>  produced topics and producing the calculated stream apparently just fine.
>
> Anyone have any ideas on what might be happening here after the upgrade?




-- 
-- Guozhang

Reply via email to