[ 
https://issues.apache.org/jira/browse/KAFKA-5381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17018380#comment-17018380
 ] 

mjuarez commented on KAFKA-5381:
--------------------------------

We ran into the same exception with kafka version 1.1.1.
{noformat}
[2020-01-17 02:33:55,203] WARN Attempting to send response via channel for 
which there is no open connection, connection id 
10.142.17.157:9092-10.142.49.6:58884-36452 (kafka.network.Processor)
[2020-01-17 02:33:55,203] WARN Attempting to send response via channel for 
which there is no open connection, connection id 
10.142.17.157:9092-10.142.52.52:37878-36458 (kafka.network.Processor)
[2020-01-17 02:33:55,203] WARN Attempting to send response via channel for 
which there is no open connection, connection id 
10.142.17.157:9092-10.142.37.248:43442-36340 (kafka.network.Processor)
[2020-01-17 02:33:55,206] WARN Attempting to send response via channel for 
which there is no open connection, connection id 
10.142.17.157:9092-10.142.16.9:54542-35541 (kafka.network.Processor)
[2020-01-17 02:33:55,207] WARN Attempting to send response via channel for 
which there is no open connection, connection id 
10.142.17.157:9092-10.142.36.83:53798-36337 (kafka.network.Processor)
[2020-01-17 02:33:55,215] INFO [Partition sample_topic_error-9 broker=6] 
Shrinking ISR from 6,2,1 to 6 (kafka.cluster.Partition)
[2020-01-17 02:33:55,216] WARN Attempting to send response via channel for 
which there is no open connection, connection id 
10.142.17.157:9092-10.142.43.25:47680-36360 (kafka.network.Processor)
[2020-01-17 02:33:55,220] WARN Attempting to send response via channel for 
which there is no open connection, connection id 
10.142.17.157:9092-10.142.36.38:53360-36328 (kafka.network.Processor)
[2020-01-17 02:33:55,240] WARN Attempting to send response via channel for 
which there is no open connection, connection id 
10.142.17.157:9092-10.142.51.81:38058-36536 (kafka.network.Processor)
[2020-01-17 02:33:55,240] WARN Attempting to send response via channel for 
which there is no open connection, connection id 
10.142.17.157:9092-10.142.41.225:49834-36329 (kafka.network.Processor)
[2020-01-17 02:33:55,283] INFO [GroupCoordinator 6]: Preparing to rebalance 
group light_group_v3 with old generation 226 (__consumer_offsets-32) 
(kafka.coordinator.group.GroupCoordinator)
[2020-01-17 02:33:55,296] INFO [ZooKeeperClient] Waiting until connected. 
(kafka.zookeeper.ZooKeeperClient)
[2020-01-17 02:33:55,296] INFO [ZooKeeperClient] Connected. 
(kafka.zookeeper.ZooKeeperClient)
[2020-01-17 02:33:55,797] INFO [GroupCoordinator 6]: Preparing to rebalance 
group analytics_completed with old generation 394 (__consumer_offsets-47) 
(kafka.coordinator.group.GroupCoordinator)
[2020-01-17 02:33:55,913] INFO [ZooKeeperClient] Session expired. 
(kafka.zookeeper.ZooKeeperClient)
[2020-01-17 02:33:55,915] INFO [ZooKeeperClient] Initializing a new session to 
prod-zk1.prod.core.company.org:2181,prod-zk2.prod.core.company.org:2181,prod-zk3.prod.core.company.org:2181.
 (kafka.zookeeper.ZooKeeperClient)
[2020-01-17 02:33:55,916] INFO Creating /brokers/ids/6 (is it secure? false) 
(kafka.zk.KafkaZkClient)
[2020-01-17 02:33:55,921] INFO Processing notification(s) to /config/changes 
(kafka.common.ZkNodeChangeNotificationListener)
[2020-01-17 02:33:55,922] INFO Result of znode creation at /brokers/ids/6 is: 
OK (kafka.zk.KafkaZkClient)
[2020-01-17 02:33:55,922] INFO Registered broker 6 at path /brokers/ids/6 with 
addresses: 
ArrayBuffer(EndPoint(prod-kafka6.prod.core.company.org,9092,ListenerName(PLAINTEXT),PLAINTEXT))
 (kafka.zk.KafkaZkClient)
[2020-01-17 02:33:56,016] ERROR Uncaught exception in scheduled task 
'isr-expiration' (kafka.utils.KafkaScheduler)
org.apache.zookeeper.KeeperException$SessionExpiredException: KeeperErrorCode = 
Session expired for /brokers/topics/sample_topic_error/partitions/9/state
        at org.apache.zookeeper.KeeperException.create(KeeperException.java:127)
        at org.apache.zookeeper.KeeperException.create(KeeperException.java:51)
        at 
kafka.zookeeper.AsyncResponse.resultException(ZooKeeperClient.scala:487)
        at kafka.zk.KafkaZkClient.conditionalUpdatePath(KafkaZkClient.scala:631)
        at 
kafka.utils.ReplicationUtils$.updateLeaderAndIsr(ReplicationUtils.scala:33)
        at 
kafka.cluster.Partition.kafka$cluster$Partition$$updateIsr(Partition.scala:727)
        at kafka.cluster.Partition$$anonfun$2.apply$mcZ$sp(Partition.scala:545)
        at kafka.cluster.Partition$$anonfun$2.apply(Partition.scala:536)
        at kafka.cluster.Partition$$anonfun$2.apply(Partition.scala:536)
        at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250)
        at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:258)
        at kafka.cluster.Partition.maybeShrinkIsr(Partition.scala:535)
        at 
kafka.server.ReplicaManager$$anonfun$kafka$server$ReplicaManager$$maybeShrinkIsr$2.apply(ReplicaManager.scala:1330)
        at 
kafka.server.ReplicaManager$$anonfun$kafka$server$ReplicaManager$$maybeShrinkIsr$2.apply(ReplicaManager.scala:1330)
        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
        at 
kafka.server.ReplicaManager.kafka$server$ReplicaManager$$maybeShrinkIsr(ReplicaManager.scala:1330)
        at 
kafka.server.ReplicaManager$$anonfun$2.apply$mcV$sp(ReplicaManager.scala:317)
        at 
kafka.utils.KafkaScheduler$$anonfun$1.apply$mcV$sp(KafkaScheduler.scala:114)
        at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:62)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748){noformat}
It seems there may have been a network blip with some kubernetes AWS consumers, 
and that caused the broker to shrink the ISR for at least one topic (shown at 
`[2020-01-17 02:33:55,215]`).  However, shortly after that, we saw the same 
*SessionExpiredException* , and all the partitions on this particular node were 
taken out of the ISRs.  We had to restart the kafka process on that node, and 
issue an election leader command to get the cluster back to normal.

Happy to provide more details if needed.

> ERROR Uncaught exception in scheduled task 'delete-expired-consumer-offsets' 
> (kafka.utils.KafkaScheduler)
> ---------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-5381
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5381
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Jeff Widman
>            Priority: Major
>
> We have a 6 node cluster of 0.10.0.1 brokers. Broker 4 had a hardware 
> problem, so we re-assigned all its partitions to other brokers. We 
> immediately started observing the error described in KAFKA-4362 from several 
> of our consumers.
> However, on broker 6, we also started seeing the following exceptions in 
> {{KafkaScheduler}} which have a somewhat similar-looking traceback:
> {code}
> [2017-06-03 17:23:57,926] ERROR Uncaught exception in scheduled task 
> 'delete-expired-consumer-offsets' (kafka.utils.KafkaScheduler)
> java.lang.IllegalArgumentException: Message format version for partition 50 
> not found
>         at 
> kafka.coordinator.GroupMetadataManager$$anonfun$14.apply(GroupMetadataManager.scala:633)
>         at 
> kafka.coordinator.GroupMetadataManager$$anonfun$14.apply(GroupMetadataManager.scala:633)
>         at scala.Option.getOrElse(Option.scala:121)
>         at 
> kafka.coordinator.GroupMetadataManager.kafka$coordinator$GroupMetadataManager$$getMessageFormatVersionAndTimestamp(GroupMetadataManager.scala:632)
>         at 
> kafka.coordinator.GroupMetadataManager$$anonfun$2$$anonfun$10.apply(GroupMetadataManager.scala:560)
>         at 
> kafka.coordinator.GroupMetadataManager$$anonfun$2$$anonfun$10.apply(GroupMetadataManager.scala:551)
>         at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>         at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>         at scala.collection.immutable.List.foreach(List.scala:381)
>         at 
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>         at scala.collection.immutable.List.map(List.scala:285)
>         at 
> kafka.coordinator.GroupMetadataManager$$anonfun$2.apply$mcI$sp(GroupMetadataManager.scala:551)
>         at 
> kafka.coordinator.GroupMetadataManager$$anonfun$2.apply(GroupMetadataManager.scala:543)
>         at 
> kafka.coordinator.GroupMetadataManager$$anonfun$2.apply(GroupMetadataManager.scala:543)
>         at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:231)
>         at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:239)
>         at 
> kafka.coordinator.GroupMetadataManager.kafka$coordinator$GroupMetadataManager$$deleteExpiredOffsets(GroupMetadataManager.scala:543)
>         at 
> kafka.coordinator.GroupMetadataManager$$anonfun$1.apply$mcV$sp(GroupMetadataManager.scala:87)
>         at 
> kafka.utils.KafkaScheduler$$anonfun$1.apply$mcV$sp(KafkaScheduler.scala:110)
>         at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:56)
>         at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>         at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>         at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>         at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>         at java.lang.Thread.run(Thread.java:748)
> {code}
> Unsurprisingly, the error disappeared once {{offsets.retention.minutes}} 
> passed.
> This appears to be similar root cause to KAFKA-4362 where 
> {{GroupMetadataManager.getMessageFormatVersionAndTimestamp}} is throwing the 
> error due to the offset partition being moved, but I'm unclear whether the 
> fix for that version also fixed the {{KafkaScheduler}} or if more work needs 
> to be done here.
> We did the partition re-assignment by using the 
> {{kafka-reassign-partitions.sh}} script and giving it the five healthy 
> brokers. From my understanding, this would have randomly re-assigned all 
> partitions (I don't think its sticky), so probably at least one partition 
> from the {{__consumer_offsets}} topic was removed from broker 6. However, if 
> that was the case, I would have expected all brokers to have had these 
> partitions removed and be throwing this error. But our logging infrastructure 
> shows that this error was only happening on broker 6, not on the other 
> brokers. Not sure why that is.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to