[ https://issues.apache.org/jira/browse/KAFKA-5875?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Ray Chiang updated KAFKA-5875: ------------------------------ Component/s: consumer > Consumer group repeatedly fails to join, even across JVM restarts: > BufferUnderFlowException reading the {{version}} field in the consumer > protocol header > --------------------------------------------------------------------------------------------------------------------------------------------------------- > > Key: KAFKA-5875 > URL: https://issues.apache.org/jira/browse/KAFKA-5875 > Project: Kafka > Issue Type: Bug > Components: consumer > Reporter: Evan Pollan > Priority: Major > > I've seen this maybe once a month in our large cluster Kubernetes-based Kafka > consumers & producers. Every once in a while a consumer in a Kubernetes > "pod" get this error trying to join a consumer group: > {code} > {"level":"INFO","@timestamp":"2017-09-12T13:45:42.173+0000","logger":"org.apache.kafka.common.utils.AppInfoParser","message":"Kafka > version : 0.11.0.0","exception":""} > {"level":"INFO","@timestamp":"2017-09-12T13:45:42.173+0000","logger":"org.apache.kafka.common.utils.AppInfoParser","message":"Kafka > commitId : cb8625948210849f","exception":""} > {"level":"INFO","@timestamp":"2017-09-12T13:45:42.178+0000","logger":"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator","message":"Revoking > previously assigned partitions [] for group > conv-fetch-jobs-runner-for-internal","exception":""} > {"level":"INFO","@timestamp":"2017-09-12T13:45:42.178+0000","logger":"org.apache.kafka.clients.consumer.internals.AbstractCoordinator","message":"(Re-)joining > group conv-fetch-jobs-runner-for-internal","exception":""} > {"level":"INFO","@timestamp":"2017-09-12T13:45:43.588+0000","logger":"org.apache.kafka.clients.consumer.internals.AbstractCoordinator","message":"Successfully > joined group conv-fetch-jobs-runner-for-internal with generation > 17297","exception":""} > {"errorType":"Error reading field 'version': > java.nio.BufferUnderflowException","level":"ERROR","message":"Died!","operation":"Died!","stacktrace":"org.apache.kafka.common.protocol.types.SchemaException: > Error reading field 'version': java.nio.BufferUnderflowException\n\tat > org.apache.kafka.common.protocol.types.Schema.read(Schema.java:75)\n\tat > org.apache.kafka.clients.consumer.internals.ConsumerProtocol.deserializeAssignment(ConsumerProtocol.java:105)\n\tat > > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:220)\n\tat > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:363)\n\tat > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310)\n\tat > > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297)\n\tat > > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078)\n\tat > > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)\n\tat > > com.spredfast.common.kafka.consumer.RunnableConsumer.pollOnce(RunnableConsumer.java:141)\n\tat > > com.spredfast.common.kafka.consumer.RunnableConsumer.access$000(RunnableConsumer.java:28)\n\tat > > com.spredfast.common.kafka.consumer.RunnableConsumer$Loop.iterate(RunnableConsumer.java:125)\n\tat > > com.spredfast.common.kafka.consumer.RunnableConsumer.run(RunnableConsumer.java:78)\n\tat > > java.lang.Thread.run(Thread.java:745)\n","trackingId":"dead-consumer","logger":"com.spredfast.common.kafka.consumer.RunnableConsumer","loggingVersion":"UNKNOWN","component":"MetricsFetch","pid":25,"host":"fetch-2420457278-sh4f5","@timestamp":"2017-09-12T13:45:43.613Z"} > {code} > Pardon the log format -- these get sucked into logstash, thus the JSON. > Here's the raw stacktrace: > {code} > org.apache.kafka.common.protocol.types.SchemaException: Error reading field > 'version': java.nio.BufferUnderflowException > at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:75) > at > org.apache.kafka.clients.consumer.internals.ConsumerProtocol.deserializeAssignment(ConsumerProtocol.java:105) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:220) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:363) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043) > at > com.spredfast.common.kafka.consumer.RunnableConsumer.pollOnce(RunnableConsumer.java:141) > at > com.spredfast.common.kafka.consumer.RunnableConsumer.access$000(RunnableConsumer.java:28) > at > com.spredfast.common.kafka.consumer.RunnableConsumer$Loop.iterate(RunnableConsumer.java:125) > at > com.spredfast.common.kafka.consumer.RunnableConsumer.run(RunnableConsumer.java:78) > at java.lang.Thread.run(Thread.java:745) > {code} > What's fascinating about this is: > * We have a liveness probe (Kubernetes term for a healthcheck whose failure > will cause the container backing the "pod" to be killed and restarted) > attached to the existence of dead consumers. When this situation happens, it > _never_ resolves itself. Today, I found a pod that had been restarted 1023 > times due to this error. > * The only way to make it go away is to _delete_ the Kubernetes pod. This > causes it to be replaced by a pod on another Kubernetes host ("minion") using > the same docker image and configuration. Invariably, this pod comes up and > all consumers join just fine > * We have _not_ tried restarting the brokers when this happens. > * There must be something about the pod, container, or Kubernetes host that > is consistent across pod crash loops that factors into the consumer group > join process -- MAC? hostname? Can't be anything that is recomputed on JVM > restart, though... > Seems like there's either: > # a bug in the client (i.e. in its assumption that it can deserialize a > protocol header on successful return of that join future). maybe there's a > flavor of broker response that doesn't include this header? > # a bug in the broker in that it's sending an empty or undersized response to > a group join command in some situtations. > It's worth noting that the severity of this issue is magnified by the fact > that it requires manual intervention. It wouldn't be so bad if our > healthcheck failure tripped a pod restart, and the new JVM's consumers would > join OK. But, the fact that even a JVM restart doesn't do it means most > resiliency plays won't work. > BTW, I see a similar schema read failure in > https://issues.apache.org/jira/browse/KAFKA-4349, although the client code is > completely different (admin {{ConsumerGroupCommand}}) -- This message was sent by Atlassian JIRA (v7.6.3#76005)