Hey Alex,

Huh.

Unprefixed configs apply to all consumers, but in this case, it's
irrelevant because only the "main" consumer participates in group
management (so the config only applies to the main consumer).

So you actually have max.poll.interval.ms set to Integer.MAX_VALUE,
which amounts to 25 days? I agree, in that case it doesn't seem like
it could be a slow batch. In fact, it couldn't be anything related to
polling, since you see rebalances sooner than 25 days.

If you have the broker logs, they'll contain the reason for the rebalance.
The only other thing I can think of that causes rebalances is failing to 
heartbeat. What do you have for session.timeout.ms and
heartbeat.interval.ms ?

If anyone else has any ideas, please jump in.

Thanks,
-John

On Fri, Apr 10, 2020, at 14:55, Alex Craig wrote:
> Thanks John, I double-checked my configs and I've actually got the
> max.poll.interval.ms set to the max (not prefixed with anything so
> presumably that’s the “main” consumer).  So I think that means the problem
> isn’t due to a single batch of messages not getting processed/committed
> within the polling cycle right?  I guess what I’m wondering is, could the
> OVERALL length of time needed to fully restore the state stores (which
> could be multiple topics with multiple partitions) be exceeding some
> timeout or threshold?  Thanks again for any ideas,
> 
> 
> 
> Alex C
> 
> 
> On Thu, Apr 9, 2020 at 9:36 AM John Roesler <vvcep...@apache.org> wrote:
> 
> > Hi Alex,
> >
> > It sounds like your theory is plausible. After a rebalance, Streams needs
> > to restore its stores from the changelog topics. Currently, Streams
> > performs this restore operation in the same loop that does processing and
> > polls the consumer for more records. If the restore batches (or the
> > processing) take too long, Streams won’t be able to call Consumer#poll (on
> > the “main” consumer)within the max.poll.interval, which causes the
> > Consumer’s heartbeat thread to assume the instance is unhealthy and stop
> > sending heartbeats, which in turn causes another rebalance.
> >
> > You could try either adjusting the max poll interval for the _main_
> > consumer or decreasing the batch size for the _restore_ consumer to make
> > sure Streams can call poll() frequently enough to stay in the group. There
> > are prefixes you can add to the consumer configuration portions to target
> > the main or restore consumer.
> >
> > Also worth noting, we’re planning to change this up pretty soon, so that
> > restoration happens in a separate thread and doesn’t block polling like
> > this.
> >
> > I hope this helps!
> > -John
> >
> > On Thu, Apr 9, 2020, at 08:33, Alex Craig wrote:
> > > Hi all, I’ve got a Kafka Streams application running in a Kubernetes
> > > environment.  The topology on this application has 2 aggregations (and
> > > therefore 2 Ktables), both of which can get fairly large – the first is
> > > around 200GB and the second around 500GB.  As with any K8s platform, pods
> > > can occasionally get rescheduled or go down, which of course will cause
> > my
> > > application to rebalance.  However, what I’m seeing is the application
> > will
> > > literally spend hours rebalancing, without any errors being thrown or
> > other
> > > obvious causes for the frequent rebalances – all I can see in the logs is
> > > an instance will be restoring a state store from the changelog topic,
> > then
> > > suddenly it will have its partitions revoked and begin the join-group
> > > process all over again.  (I’m running 10 pods/instances of my app, and I
> > > see this same pattern in each instance)  In some cases it never really
> > > recovers from this rebalancing cycle – even after 12 hours or more - and
> > > I’ve had to scale down the application completely and start over by
> > purging
> > > the application state and re-consuming from earliest on the source topic.
> > > Interestingly, after purging and starting from scratch the application
> > > seems to recover from rebalances pretty easily.
> > >
> > > The storage I’m using is a NAS device, which admittedly is not
> > particularly
> > > fast.  (it’s using spinning disks and is shared amongst other tenants) As
> > > an experiment, I’ve tried switching the k8s storage to an in-memory
> > option
> > > (this is at the k8s layer - the application is still using the same
> > RocksDB
> > > stores) to see if that helps.  As it turns out, I never have the
> > rebalance
> > > problem when using an in-memory persistence layer.  If a pod goes down,
> > the
> > > application spends around 10 - 15 minutes rebalancing and then is back to
> > > processing data again.
> > >
> > > At this point I guess my main question is: when I’m using the NAS storage
> > > and the state stores are fairly large, could I be hitting some timeout
> > > somewhere that isn’t allowing the restore process to complete, which then
> > > triggers another rebalance?  In other words, the restore process is
> > simply
> > > taking too long given the amount of data needed to restore and the slow
> > > storage?   I’m currently using Kafka 2.4.1, but I saw this same behavior
> > in
> > > 2.3.  I am using a custom RocksDB config setter to limit off-heap memory,
> > > but I’ve tried removing that and saw no difference in the rebalance
> > > problem.  Again, no errors that I’m seeing or anything else in the logs
> > > that seems to indicate why it can never finish rebalancing.  I’ve tried
> > > turning on DEBUG logging but I’m having a tough time sifting through the
> > > amount of log messages, though I’m still looking.
> > >
> > > If anyone has any ideas I would appreciate it, thanks!
> > >
> > > Alex C
> > >
> >
>

Reply via email to