Well lets say previous thread was processing a large block and while
processing it got bumped out.
So new thread which got this partition may have different offset when it
starts the restore and by the time it completes the restore.

If that is the case then it should just ignore that task for that cycle and
move on to next task.
When it returns again it can again try to restore the state and if by that
time there is no other thread processing the partition it can start
processing.

I see no reason to raise the exception and kill the thread entirely.

Thanks
Sachin


On Mon, Mar 27, 2017 at 3:56 PM, Damian Guy <damian....@gmail.com> wrote:

> Yes, but we don't know why it is still processing the data. We don't want
> to have multiple processes acting on the same tasks, hence the exception.
> What if for some reason the other task is processing a large backlog, how
> long do we wait before we give up?
>
> I think in this case the exception is the right thing to do
>
> On Mon, 27 Mar 2017 at 09:24 Sachin Mittal <sjmit...@gmail.com> wrote:
>
> Hi,
> These are the logs
> https://www.dropbox.com/s/2t4ysfdqbtmcusq/complete_84_85_87_log.zip?dl=0
>
> I think this may not always be the case especially if previous owner is on
> a different machine.
>
> Say it is processing and it takes more than the poll timeout to process or
> commit the offset.
>
> The group bumps this thread and assigns its task to a different thread on
> maybe a different machine.
>
> All this while this client may be pushing the changelog data and other
> thread restoring the state from the same partition.
>
> So between the time it starts and it seeks to the end of the changelog it
> may be possible that previous thread which was still in process since it
> did not know it got bumped out added some more data to that changelog.
>
> Only when previous thread tries to commit the offset it gets to know that
> it is no longer the owner of the partition and then issues a rejoin
> request.
>
> I think in such a case should be handled within streams application.
>
> Thanks
> Sachin
>
>
>
>
>
> On Mon, Mar 27, 2017 at 1:25 PM, Damian Guy <damian....@gmail.com> wrote:
>
> > Hi Sachin,
> >
> > This should not happen. The previous owner of the task should have
> stopped
> > processing before the restoration begins. So if this is happening, then
> > that signals a bug. Do you have more logs?
> >
> > Thanks,
> > Damian
> >
> > On Sat, 25 Mar 2017 at 08:20 Sachin Mittal <sjmit...@gmail.com> wrote:
> >
> > > Hi,
> > > So recently we fixed the deadlock issue and also built the streams jar
> by
> > > copying the rocks db configs from trunk.
> > > So we don't get any deadlock issue now and also we see that the wait
> time
> > > of CPU cores stays around 5% (down from 50% earlier).
> > >
> > > However we now get a new exception which is not handled by streams
> > > application and causes the instance to shutdown.
> > >
> > > org.apache.kafka.streams.errors.StreamsException: stream-thread
> > > [StreamThread-2] Failed to rebalance
> > >     at
> > >
> > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> > StreamThread.java:622)
> > > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> > >     at
> > >
> > > org.apache.kafka.streams.processor.internals.
> > StreamThread.run(StreamThread.java:378)
> > > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> > > Caused by: java.lang.IllegalStateException: task [0_9] Log end offset
> of
> > > new-part-advice-key-table-changelog-9 should not change while
> restoring:
> > > old end offset 647352, current offset 647632
> > >     at
> > >
> > > org.apache.kafka.streams.processor.internals.ProcessorStateManager.
> > restoreActiveState(ProcessorStateManager.java:240)
> > > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> > >     at
> > >
> > > org.apache.kafka.streams.processor.internals.ProcessorStateManager.
> > register(ProcessorStateManager.java:193)
> > > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> > >     at
> > >
> > > org.apache.kafka.streams.processor.internals.AbstractProcessorContext.
> > register(AbstractProcessorContext.java:99)
> > > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> > >     at
> > >
> > > org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore.
> > init(RocksDBSegmentedBytesStore.java:101)
> > > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> > >     at
> > >
> > > org.apache.kafka.streams.state.internals.
> ChangeLoggingSegmentedBytesSto
> > re.init(ChangeLoggingSegmentedBytesStore.java:68)
> > > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> > >     at
> > >
> > > org.apache.kafka.streams.state.internals.MeteredSegmentedBytesStore.
> > init(MeteredSegmentedBytesStore.java:66)
> > > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> > >     at
> > >
> > > org.apache.kafka.streams.state.internals.RocksDBWindowStore.init(
> > RocksDBWindowStore.java:76)
> > > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> > >     at
> > >
> > > org.apache.kafka.streams.processor.internals.AbstractTask.
> > initializeStateStores(AbstractTask.java:86)
> > > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> > >     at
> > >
> > > org.apache.kafka.streams.processor.internals.
> > StreamTask.<init>(StreamTask.java:141)
> > > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> > >
> > > What I check from logs is this
> > > DEBUG 2017-03-25 02:07:24,499 [StreamThread-2]:
> > > org.apache.kafka.streams.processor.internals.StreamThread -
> > stream-thread
> > > [StreamThread-2] creating new task 0_9
> > > So it creates the task at this time.
> > >
> > > To create the local state store from the chnagelog topic it starts at
> > >
> > > DEBUG 2017-03-25 02:07:24,550 [StreamThread-2]:
> > > org.apache.kafka.clients.consumer.KafkaConsumer - Subscribed to
> > > partition(s): new-part-advice-key-table-changelog-9
> > > DEBUG 2017-03-25 02:07:24,550 [StreamThread-2]:
> > > org.apache.kafka.clients.consumer.KafkaConsumer - Seeking to end of
> > > partition new-part-advice-key-table-changelog-9
> > > DEBUG 2017-03-25 02:07:24,550 [StreamThread-2]:
> > > org.apache.kafka.clients.consumer.internals.Fetcher - Resetting offset
> > for
> > > partition new-part-advice-key-table-changelog-9 to latest offset.
> > > DEBUG 2017-03-25 02:07:24,552 [StreamThread-2]:
> > > org.apache.kafka.clients.consumer.internals.Fetcher - Handling
> > > ListOffsetResponse response for new-part-advice-key-table-changelog-9.
> > > Fetched offset 647352, timestamp -1
> > > DEBUG 2017-03-25 02:07:24,552 [StreamThread-2]:
> > > org.apache.kafka.clients.consumer.KafkaConsumer - Seeking to beginning
> > of
> > > partition new-part-advice-key-table-changelog-9
> > > DEBUG 2017-03-25 02:07:24,552 [StreamThread-2]:
> > > org.apache.kafka.clients.consumer.internals.Fetcher - Resetting offset
> > for
> > > partition new-part-advice-key-table-changelog-9 to earliest offset.
> > >
> > > and process is over at
> > > DEBUG 2017-03-25 02:10:21,225 [StreamThread-2]:
> > > org.apache.kafka.clients.consumer.internals.Fetcher - Sending fetch
> for
> > > partitions [new-part-advice-key-table-changelog-9] to broker
> > > 192.168.73.199:9092 (id: 5 rack: null)
> > > DEBUG 2017-03-25 02:10:21,230 [StreamThread-2]:
> > > org.apache.kafka.clients.consumer.KafkaConsumer - Unsubscribed all
> > topics
> > > or patterns and assigned partitions
> > >
> > > And the exception is thrown at:
> > > ERROR 2017-03-25 02:10:21,232 [StreamThread-2]:
> > > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - User
> > > provided listener
> > > org.apache.kafka.streams.processor.internals.StreamThread$1 for group
> > > new-part-advice failed on partition assignment
> > > java.lang.IllegalStateException: task [0_9] Log end offset of
> > > new-part-advice-key-table-changelog-9 should not change while
> restoring:
> > > old end offset 647352, current offset 647632
> > >
> > > So you can clearly see that while restoring the state some other thread
> > (on
> > > same or other instance) did commit some more offset for this change
> long
> > > partition so in the end of the process two offsets did not match. I
> think
> > > this is fairly a reasonable scenario and while restoring the state it
> > > should also consider any added offsets and not assume that this is the
> > only
> > > thread processing that partition. It may have been some other instance
> > did
> > > commit some more offsets while this thread is trying to restore the
> > state.
> > >
> > > So I feel this exception should be handled and not thrown all the way
> to
> > > the streams.
> > >
> > > What do you all think.
> > >
> > > Thanks
> > > Sachin
> > >
> >
>

Reply via email to