[
https://issues.apache.org/jira/browse/KAFKA-7576?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Rajini Sivaram resolved KAFKA-7576.
-----------------------------------
Resolution: Fixed
Reviewer: Jason Gustafson
> Dynamic update of replica fetcher threads may fail to start/close fetchers
> --------------------------------------------------------------------------
>
> Key: KAFKA-7576
> URL: https://issues.apache.org/jira/browse/KAFKA-7576
> Project: Kafka
> Issue Type: Bug
> Components: core
> Affects Versions: 1.1.1, 2.0.1, 2.1.0
> Reporter: Rajini Sivaram
> Assignee: Rajini Sivaram
> Priority: Major
> Fix For: 1.1.2, 2.1.1, 2.0.2
>
>
> KAFKA-6051 moved ReplicaFetcherBlockingSend shutdown earlier in the shutdown
> sequence of ReplicaFetcherThread. As a result, shutdown of replica fetchers
> can now throw an exception because Selector may be closed on a different
> thread while data is being written on another thread. KAFKA-7464 changed this
> behaviour for 2.0.1 and 2.1.0. The exception during close() is now logged and
> not propagated to avoid exceptions during broker shutdown.
> When config update notification of `num.replica.fetchers` is processed,
> partitions are migrated as necessary to increase or decrease the number of
> fetcher threads. Existing fetchers are shutdown first before new ones are
> created.This migration is performed on the thread processing ZK change
> notification. The shutdown of Selector of existing fetchers is not safe since
> replica fetcher thread may be processing data at the time using the same
> Selector.
> Without the fix from KAFKA-7464, another update of the config or broker
> restart is required to restart the replica fetchers after dynamic config
> update if shutdown encounters an exception.
> Exception stack trace:
> {code:java}
> java.lang.IllegalArgumentException
> at java.nio.Buffer.position(Buffer.java:244)
> at sun.nio.ch.IOUtil.write(IOUtil.java:68)
> at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
> at
> org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:210)
> at
> org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:160)
> at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:718)
> at
> org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:70)
> at org.apache.kafka.common.network.Selector.doClose(Selector.java:748)
> at org.apache.kafka.common.network.Selector.close(Selector.java:736)
> at org.apache.kafka.common.network.Selector.close(Selector.java:698)
> at org.apache.kafka.common.network.Selector.close(Selector.java:314)
> at
> org.apache.kafka.clients.NetworkClient.close(NetworkClient.java:533)
> at
> kafka.server.ReplicaFetcherBlockingSend.close(ReplicaFetcherBlockingSend.scala:107)
> at
> kafka.server.ReplicaFetcherThread.initiateShutdown(ReplicaFetcherThread.scala:90)
> at
> kafka.server.AbstractFetcherThread.shutdown(AbstractFetcherThread.scala:86)
> at
> kafka.server.AbstractFetcherManager$$anonfun$migratePartitions$1$1.apply(AbstractFetcherManager.scala:76)
> at
> kafka.server.AbstractFetcherManager$$anonfun$migratePartitions$1$1.apply(AbstractFetcherManager.scala:72)
> at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
> at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
> at
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236)
> at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
> at scala.collection.mutable.HashMap.foreach(HashMap.scala:130)
> at
> kafka.server.AbstractFetcherManager.migratePartitions$1(AbstractFetcherManager.scala:72)
> at
> kafka.server.AbstractFetcherManager.resizeThreadPool(AbstractFetcherManager.scala:88)
> at
> kafka.server.DynamicThreadPool.reconfigure(DynamicBrokerConfig.scala:574)
> at
> kafka.server.DynamicBrokerConfig$$anonfun$kafka$server$DynamicBrokerConfig$$updateCurrentConfig$1.apply(DynamicBrokerConfig.scala:410)
> at
> kafka.server.DynamicBrokerConfig$$anonfun$kafka$server$DynamicBrokerConfig$$updateCurrentConfig$1.apply(DynamicBrokerConfig.scala:410)
> at scala.collection.immutable.List.foreach(List.scala:392)
> at
> kafka.server.DynamicBrokerConfig.kafka$server$DynamicBrokerConfig$$updateCurrentConfig(DynamicBrokerConfig.scala:410)
> <SKIP>kafka.common.ZkNodeChangeNotificationListener$ChangeEventProcessThread.doWork(ZkNodeChangeNotificationListener.scala:135)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
> {code}
> The fix from KAFKA-7464 in 2.0.1 and 2.1.0 avoids the issue with creation of
> replica fetchers during dynamic update. But even for those branches, we
> should clean up the Selector to avoid resource leak in the dynamic config
> update case (discarding the exception may be sufficient when the broker is
> shutdown).
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)