[
https://issues.apache.org/jira/browse/KAFKA-7403?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jon Lee updated KAFKA-7403:
---------------------------
Description:
I am currently trying broker upgrade from 0.11 to 2.0 with some patches
including KIP-211/KAFKA-4682. After the upgrade, however, applications with
0.10.2 Kafka clients failed with the following error:
{code:java}
2018/09/11 19:34:52.814 ERROR Failed to commit offsets. Exiting.
org.apache.kafka.common.KafkaException: Unexpected error in commit: The server
experienced an unexpected error when processing the request at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:784)
~[kafka-clients-0.10.2.86.jar:?] at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:722)
~[kafka-clients-0.10.2.86.jar:?] at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:784)
~[kafka-clients-0.10.2.86.jar:?] at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:765)
~[kafka-clients-0.10.2.86.jar:?] at
org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:186)
~[kafka-clients-0.10.2.86.jar:?] at
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:149)
~[kafka-clients-0.10.2.86.jar:?] at
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:116)
~[kafka-clients-0.10.2.86.jar:?] at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:493)
~[kafka-clients-0.10.2.86.jar:?] at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:322)
~[kafka-clients-0.10.2.86.jar:?] at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:253)
~[kafka-clients-0.10.2.86.jar:?]
{code}
>From my reading of the code, it looks like the following happened:
# The 0.10.2 client sends a v2 OffsetCommitRequest to the broker. It sets the
retentionTime field of the OffsetCommitRequest to DEFAULT_RETENTION_TIME.
# In the 2.0 broker code, upon receiving an OffsetCommitRequest with
DEFAULT_RETENTION_TIME, KafkaApis.handleOffsetCommitRequest() sets the
"expireTimestamp" field of OffsetAndMetadata to None.
# Later in the code path, GroupMetadataManager.offsetCommitValue() expects
OffsetAndMetadata to have a non-empty "expireTimestamp" field if the
inter.broker.protocol.version is < KAFKA_2_1_IV0.
# However, the inter.broker.protocol.version was set to "1.0" prior to the
upgrade, and as a result, the following code in offsetCommitValue() raises an
error because expireTimestamp is None:
{code:java}
value.set(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1,
offsetAndMetadata.expireTimestamp.get){code}
Here is the stack trace for the broker side error
{code:java}
java.util.NoSuchElementException: None.get
at scala.None$.get(Option.scala:347) ~[scala-library-2.11.12.jar:?]
at scala.None$.get(Option.scala:345) ~[scala-library-2.11.12.jar:?]
at
kafka.coordinator.group.GroupMetadataManager$.offsetCommitValue(GroupMetadataManager.scala:1109)
~[kafka_2.11-2.0.0.10.jar:?]
at
kafka.coordinator.group.GroupMetadataManager$$anonfun$7.apply(GroupMetadataManager.scala:326)
~[kafka_2.11-2.0.0.10.jar:?]
at
kafka.coordinator.group.GroupMetadataManager$$anonfun$7.apply(GroupMetadataManager.scala:324)
~[kafka_2.11-2.0.0.10.jar:?]
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
~[scala-library-2.11.12.jar:?]
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
~[scala-library-2.11.12.jar:?]
at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:221)
~[scala-library-2.11.12.jar:?]
at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428)
~[scala-library-2.11.12.jar:?]
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
~[scala-library-2.11.12.jar:?]
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
~[scala-library-2.11.12.jar:?]
at
kafka.coordinator.group.GroupMetadataManager.storeOffsets(GroupMetadataManager.scala:324)
~[kafka_2.11-2.0.0.10.jar:?]
at
kafka.coordinator.group.GroupCoordinator$$anonfun$doCommitOffsets$1.apply$mcV$sp(GroupCoordinator.scala:521)
~[kafka_2.11-2.0.0.10.jar:?]
at
kafka.coordinator.group.GroupCoordinator$$anonfun$doCommitOffsets$1.apply(GroupCoordinator.scala:506)
~[kafka_2.11-2.0.0.10.jar:?]
at
kafka.coordinator.group.GroupCoordinator$$anonfun$doCommitOffsets$1.apply(GroupCoordinator.scala:506)
~[kafka_2.11-2.0.0.10.jar:?]
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
~[kafka_2.11-2.0.0.10.jar:?]
at kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:193)
~[kafka_2.11-2.0.0.10.jar:?]
at
kafka.coordinator.group.GroupCoordinator.doCommitOffsets(GroupCoordinator.scala:505)
~[kafka_2.11-2.0.0.10.jar:?]
at
kafka.coordinator.group.GroupCoordinator.handleCommitOffsets(GroupCoordinator.scala:484)
~[kafka_2.11-2.0.0.10.jar:?]
at kafka.server.KafkaApis.handleOffsetCommitRequest(KafkaApis.scala:359)
~[kafka_2.11-2.0.0.10.jar:?]
at kafka.server.KafkaApis.handle(KafkaApis.scala:114)
~[kafka_2.11-2.0.0.10.jar:?]
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
~[kafka_2.11-2.0.0.10.jar:?]
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]
{code}
And I was able to reproduce the error by passing KAFKA_0_11_0_IV2 as the
ApiVersion (the second parameter) to the constructor of GroupMetadataManager in
GroupMetadataManagerTest.scala.
[~vahid], the error was from the code added for KAFKA-4682. Can you take a look
if this is indeed an issue?
was:
I am currently trying broker upgrade from 0.11 to 2.0 with some patches
including KIP-211/KAFKA-4682. After the upgrade, however, applications with
0.10.2 Kafka clients failed with the following error:
{code:java}
2018/09/11 19:34:52.814 ERROR Failed to commit offsets. Exiting.
org.apache.kafka.common.KafkaException: Unexpected error in commit: The server
experienced an unexpected error when processing the request at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:784)
~[kafka-clients-0.10.2.86.jar:?] at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:722)
~[kafka-clients-0.10.2.86.jar:?] at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:784)
~[kafka-clients-0.10.2.86.jar:?] at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:765)
~[kafka-clients-0.10.2.86.jar:?] at
org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:186)
~[kafka-clients-0.10.2.86.jar:?] at
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:149)
~[kafka-clients-0.10.2.86.jar:?] at
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:116)
~[kafka-clients-0.10.2.86.jar:?] at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:493)
~[kafka-clients-0.10.2.86.jar:?] at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:322)
~[kafka-clients-0.10.2.86.jar:?] at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:253)
~[kafka-clients-0.10.2.86.jar:?]
{code}
>From my reading of the code, it looks like the following happened:
# The 0.10.2 client sends a v2 OffsetCommitRequest to the broker. It sets the
retentionTime field of the OffsetCommitRequest to DEFAULT_RETENTION_TIME.
# In the 2.0 broker code, upon receiving an OffsetCommitRequest with
DEFAULT_RETENTION_TIME, KafkaApis.handleOffsetCommitRequest() sets the
"expireTimestamp" field of OffsetAndMetadata to None.
# Later in the code path, GroupMetadataManager.offsetCommitValue() expects
OffsetAndMetadata to have a non-empty "expireTimestamp" field if the
inter.broker.protocol.version is < KAFKA_2_1_IV0.
# However, the inter.broker.protocol.version was set to "1.0" prior to the
upgrade, and as a result, the following code in offsetCommitValue() raises an
error because expireTimestamp is None:
{code:java}
value.set(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1,
offsetAndMetadata.expireTimestamp.get){code}
Here is the stack trace for the broker side error
{code:java}
java.util.NoSuchElementException: None.get
at scala.None$.get(Option.scala:347) ~[scala-library-2.11.12.jar:?]
at scala.None$.get(Option.scala:345) ~[scala-library-2.11.12.jar:?]
at
kafka.coordinator.group.GroupMetadataManager$.offsetCommitValue(GroupMetadataManager.scala:1109)
~[kafka_2.11-2.0.0.10.jar:?]
at
kafka.coordinator.group.GroupMetadataManager$$anonfun$7.apply(GroupMetadataManager.scala:326)
~[kafka_2.11-2.0.0.10.jar:?]
at
kafka.coordinator.group.GroupMetadataManager$$anonfun$7.apply(GroupMetadataManager.scala:324)
~[kafka_2.11-2.0.0.10.jar:?]
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
~[scala-library-2.11.12.jar:?]
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
~[scala-library-2.11.12.jar:?]
at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:221)
~[scala-library-2.11.12.jar:?]
at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428)
~[scala-library-2.11.12.jar:?]
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
~[scala-library-2.11.12.jar:?]
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
~[scala-library-2.11.12.jar:?]
at
kafka.coordinator.group.GroupMetadataManager.storeOffsets(GroupMetadataManager.scala:324)
~[kafka_2.11-2.0.0.10.jar:?]
at
kafka.coordinator.group.GroupCoordinator$$anonfun$doCommitOffsets$1.apply$mcV$sp(GroupCoordinator.scala:521)
~[kafka_2.11-2.0.0.10.jar:?]
at
kafka.coordinator.group.GroupCoordinator$$anonfun$doCommitOffsets$1.apply(GroupCoordinator.scala:506)
~[kafka_2.11-2.0.0.10.jar:?]
at
kafka.coordinator.group.GroupCoordinator$$anonfun$doCommitOffsets$1.apply(GroupCoordinator.scala:506)
~[kafka_2.11-2.0.0.10.jar:?]
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
~[kafka_2.11-2.0.0.10.jar:?]
at kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:193)
~[kafka_2.11-2.0.0.10.jar:?]
at
kafka.coordinator.group.GroupCoordinator.doCommitOffsets(GroupCoordinator.scala:505)
~[kafka_2.11-2.0.0.10.jar:?]
at
kafka.coordinator.group.GroupCoordinator.handleCommitOffsets(GroupCoordinator.scala:484)
~[kafka_2.11-2.0.0.10.jar:?]
at kafka.server.KafkaApis.handleOffsetCommitRequest(KafkaApis.scala:359)
~[kafka_2.11-2.0.0.10.jar:?]
at kafka.server.KafkaApis.handle(KafkaApis.scala:114)
~[kafka_2.11-2.0.0.10.jar:?]
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
~[kafka_2.11-2.0.0.10.jar:?]
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]
{code}
And I was able to reproduce the error by passing KAFKA_0_11_0_IV2 as the
ApiVersion (the second parameter) to the constructor of GroupMetadataManager in
GroupMetadataManagerTest.scala.
[~vahid], can you take a look if this is indeed an issue?
> Offset commit failure after broker upgrade to 2.0
> -------------------------------------------------
>
> Key: KAFKA-7403
> URL: https://issues.apache.org/jira/browse/KAFKA-7403
> Project: Kafka
> Issue Type: Bug
> Components: core
> Reporter: Jon Lee
> Priority: Major
>
> I am currently trying broker upgrade from 0.11 to 2.0 with some patches
> including KIP-211/KAFKA-4682. After the upgrade, however, applications with
> 0.10.2 Kafka clients failed with the following error:
> {code:java}
> 2018/09/11 19:34:52.814 ERROR Failed to commit offsets. Exiting.
> org.apache.kafka.common.KafkaException: Unexpected error in commit: The
> server experienced an unexpected error when processing the request at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:784)
> ~[kafka-clients-0.10.2.86.jar:?] at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:722)
> ~[kafka-clients-0.10.2.86.jar:?] at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:784)
> ~[kafka-clients-0.10.2.86.jar:?] at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:765)
> ~[kafka-clients-0.10.2.86.jar:?] at
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:186)
> ~[kafka-clients-0.10.2.86.jar:?] at
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:149)
> ~[kafka-clients-0.10.2.86.jar:?] at
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:116)
> ~[kafka-clients-0.10.2.86.jar:?] at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:493)
> ~[kafka-clients-0.10.2.86.jar:?] at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:322)
> ~[kafka-clients-0.10.2.86.jar:?] at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:253)
> ~[kafka-clients-0.10.2.86.jar:?]
> {code}
> From my reading of the code, it looks like the following happened:
> # The 0.10.2 client sends a v2 OffsetCommitRequest to the broker. It sets
> the retentionTime field of the OffsetCommitRequest to DEFAULT_RETENTION_TIME.
> # In the 2.0 broker code, upon receiving an OffsetCommitRequest with
> DEFAULT_RETENTION_TIME, KafkaApis.handleOffsetCommitRequest() sets the
> "expireTimestamp" field of OffsetAndMetadata to None.
> # Later in the code path, GroupMetadataManager.offsetCommitValue() expects
> OffsetAndMetadata to have a non-empty "expireTimestamp" field if the
> inter.broker.protocol.version is < KAFKA_2_1_IV0.
> # However, the inter.broker.protocol.version was set to "1.0" prior to the
> upgrade, and as a result, the following code in offsetCommitValue() raises an
> error because expireTimestamp is None:
> {code:java}
> value.set(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1,
> offsetAndMetadata.expireTimestamp.get){code}
>
> Here is the stack trace for the broker side error
> {code:java}
> java.util.NoSuchElementException: None.get
> at scala.None$.get(Option.scala:347) ~[scala-library-2.11.12.jar:?]
> at scala.None$.get(Option.scala:345) ~[scala-library-2.11.12.jar:?]
> at
> kafka.coordinator.group.GroupMetadataManager$.offsetCommitValue(GroupMetadataManager.scala:1109)
> ~[kafka_2.11-2.0.0.10.jar:?]
> at
> kafka.coordinator.group.GroupMetadataManager$$anonfun$7.apply(GroupMetadataManager.scala:326)
> ~[kafka_2.11-2.0.0.10.jar:?]
> at
> kafka.coordinator.group.GroupMetadataManager$$anonfun$7.apply(GroupMetadataManager.scala:324)
> ~[kafka_2.11-2.0.0.10.jar:?]
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> ~[scala-library-2.11.12.jar:?]
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> ~[scala-library-2.11.12.jar:?]
> at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:221)
> ~[scala-library-2.11.12.jar:?]
> at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428)
> ~[scala-library-2.11.12.jar:?]
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> ~[scala-library-2.11.12.jar:?]
> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> ~[scala-library-2.11.12.jar:?]
> at
> kafka.coordinator.group.GroupMetadataManager.storeOffsets(GroupMetadataManager.scala:324)
> ~[kafka_2.11-2.0.0.10.jar:?]
> at
> kafka.coordinator.group.GroupCoordinator$$anonfun$doCommitOffsets$1.apply$mcV$sp(GroupCoordinator.scala:521)
> ~[kafka_2.11-2.0.0.10.jar:?]
> at
> kafka.coordinator.group.GroupCoordinator$$anonfun$doCommitOffsets$1.apply(GroupCoordinator.scala:506)
> ~[kafka_2.11-2.0.0.10.jar:?]
> at
> kafka.coordinator.group.GroupCoordinator$$anonfun$doCommitOffsets$1.apply(GroupCoordinator.scala:506)
> ~[kafka_2.11-2.0.0.10.jar:?]
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
> ~[kafka_2.11-2.0.0.10.jar:?]
> at kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:193)
> ~[kafka_2.11-2.0.0.10.jar:?]
> at
> kafka.coordinator.group.GroupCoordinator.doCommitOffsets(GroupCoordinator.scala:505)
> ~[kafka_2.11-2.0.0.10.jar:?]
> at
> kafka.coordinator.group.GroupCoordinator.handleCommitOffsets(GroupCoordinator.scala:484)
> ~[kafka_2.11-2.0.0.10.jar:?]
> at kafka.server.KafkaApis.handleOffsetCommitRequest(KafkaApis.scala:359)
> ~[kafka_2.11-2.0.0.10.jar:?]
> at kafka.server.KafkaApis.handle(KafkaApis.scala:114)
> ~[kafka_2.11-2.0.0.10.jar:?]
> at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
> ~[kafka_2.11-2.0.0.10.jar:?]
> at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]
> {code}
>
> And I was able to reproduce the error by passing KAFKA_0_11_0_IV2 as the
> ApiVersion (the second parameter) to the constructor of GroupMetadataManager
> in GroupMetadataManagerTest.scala.
>
> [~vahid], the error was from the code added for KAFKA-4682. Can you take a
> look if this is indeed an issue?
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)