[jira] [Created] (KAFKA-8921) Avoid excessive info logs in the client side for incremental fetch
Zhanxiang (Patrick) Huang created KAFKA-8921: Summary: Avoid excessive info logs in the client side for incremental fetch Key: KAFKA-8921 URL: https://issues.apache.org/jira/browse/KAFKA-8921 Project: Kafka Issue Type: Improvement Components: consumer Reporter: Zhanxiang (Patrick) Huang Assignee: Zhanxiang (Patrick) Huang Currently in FetchSessionHandler::handleResponse, the following info log will get printed out excessively when the session is evicted from the server-side cache even though there is nothing wrong with the fetch request and client cannot do much to improve it. {noformat} Node xxx was unable to process the fetch request with (sessionId=xxx, epoch=xxx): FETCH_SESSION_ID_NOT_FOUND. {noformat} Moreover, when the fetch request gets throttled, the following info logs will also get printed out, which are very misleading. {noformat} Node xxx sent an invalid full fetch response with ... Node xxx sent an invalid incremental fetch response with ... {noformat} We should avoid logging these things in INFO level and print out more informative logs for throttling. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-8668) Improve broker shutdown time
Zhanxiang (Patrick) Huang created KAFKA-8668: Summary: Improve broker shutdown time Key: KAFKA-8668 URL: https://issues.apache.org/jira/browse/KAFKA-8668 Project: Kafka Issue Type: Improvement Reporter: Zhanxiang (Patrick) Huang Assignee: Zhanxiang (Patrick) Huang During LogManager shutdown, we need to call {{timeIndex.maybeAppend()}} on each log segment close. Since in 2.0 we completely skip sanity check and do lazy mmap on indexes of segments below recovery point, in {{timeIndex.maybeAppend()}} we may need to mmap the time index file for the inactive segment if it didn't get mmap before, which is not neccessary becuase the time index hasn't changed at all. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Created] (KAFKA-8667) Improve leadership transition time
Zhanxiang (Patrick) Huang created KAFKA-8667: Summary: Improve leadership transition time Key: KAFKA-8667 URL: https://issues.apache.org/jira/browse/KAFKA-8667 Project: Kafka Issue Type: Improvement Reporter: Zhanxiang (Patrick) Huang Assignee: Zhanxiang (Patrick) Huang When the replica fetcher thread processes fetch response, it will hold the {{partitionMapLock}}. If at the same time, a LeaderAndIsr request comes in, it will be blocked at the end of its processing when calling {{shutdownIdleFetcherThread}} because it will need to wait for the {{partitionMapLock}} of each replica fetcher thread to be acquired to check whether there is any partition assigned to each fetcher and the request handler thread performs this check sequentially for the fetcher threads For example, in a cluster with 20 brokers and num.replica.fetcher.thread set to 32, if each fetcher thread holds lock for a little bit longer, the total time for the request handler thread to finish shutdownIdleFetcherThread can be a lot larger due to waiting for the partitionMapLock for a longer time for each fetcher thread. If the LeaderAndIsr gets blocked for >request.timeout.ms (default to 30s) in the broker, request send thread in the controller side will timeout while waiting for the response and try to establish a new connection to the broker and re-send the request, which will break in-order delivery because we will have more than one channel talking to the broker. Moreover, this may make the lock contention problem worse or saturate request handler threads because duplicate control requests are sent to the broker for multiple time. In our own testing, we saw up to *8 duplicate LeaderAndIsrRequest* being sent to the broker during bounce and the 99th LeaderAndIsr local time goes up to ~500s. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Created] (KAFKA-8571) Not complete delayed produce requests when processing StopReplicaRequest causing high produce latency for acks=all
Zhanxiang (Patrick) Huang created KAFKA-8571: Summary: Not complete delayed produce requests when processing StopReplicaRequest causing high produce latency for acks=all Key: KAFKA-8571 URL: https://issues.apache.org/jira/browse/KAFKA-8571 Project: Kafka Issue Type: Bug Reporter: Zhanxiang (Patrick) Huang Assignee: Zhanxiang (Patrick) Huang Currently a broker will only attempt to complete delayed requests upon highwater mark changes and receiving LeaderAndIsrRequest. When a broker receives StopReplicaRequest, it will not try to complete delayed operations including delayed produce for acks=all, which can cause the producer to timeout even though the producer should have attempted to talk to the new leader faster if a NotLeaderForPartition error is sent. This can happen during partition reassignment when controller is trying to kick the previous leader out of the replica set. It this case, controller will only send StopReplicaRequest (not LeaderAndIsrRequest) to the previous leader in the replica set shrink phase. Here is an example: {noformat} During Reassign the replica set of partition A from [B1, B2] to [B2, B3]: t0: Controller expands the replica set to [B1, B2, B3] t1: B1 receives produce request PR on partition A with acks=all and timetout T. B1 puts PR into the DelayedProducePurgatory with timeout T. t2: Controller elects B2 as the new leader and shrinks the replica set fo [B2, B3]. LeaderAndIsrRequests are sent to B2 and B3. StopReplicaRequest is sent to B!. t3: B1 receives StopReplicaRequest but doesn't try to comeplete PR. If PR cannot be fullfilled by t3, and t1 + T > t3, PR will eventually time out in the purgatory and producer will eventually time out the produce request.{noformat} Since it is possible for the leader to receive only a StopReplicaRequest (without receiving any LeaderAndIsrRequest) to leave the replica set, a fix for this issue is to also try to complete delay operations in processing StopReplicaRequest. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8069) Committed offsets get cleaned up right after the coordinator loading them back from __consumer_offsets in broker with old inter-broker protocol version (< 2.2)
Zhanxiang (Patrick) Huang created KAFKA-8069: Summary: Committed offsets get cleaned up right after the coordinator loading them back from __consumer_offsets in broker with old inter-broker protocol version (< 2.2) Key: KAFKA-8069 URL: https://issues.apache.org/jira/browse/KAFKA-8069 Project: Kafka Issue Type: Bug Reporter: Zhanxiang (Patrick) Huang Assignee: Zhanxiang (Patrick) Huang After the 2.1 release, if the broker hasn't been upgrade to the latest inter-broker protocol version, the committed offsets stored in the __consumer_offset topic will get cleaned up way earlier than it should be when the offsets are loaded back from the __consumer_offset topic in GroupCoordinator, which will happen during leadership transition or after broker bounce. TL;DR For V1 on-disk format for __consumer_offsets, we have the *expireTimestamp* field and if the inter-broker protocol (IBP) version is prior to 2.1 (prior to [KIP-211|https://cwiki.apache.org/confluence/display/KAFKA/KIP-211%3A+Revise+Expiration+Semantics+of+Consumer+Group+Offsets]) for a kafka 2.1 broker, the logic of getting the expired offsets looks like: {code:java} def getExpiredOffsets(baseTimestamp: CommitRecordMetadataAndOffset => Long): Map[TopicPartition, OffsetAndMetadata] = { offsets.filter { case (topicPartition, commitRecordMetadataAndOffset) => ... && { commitRecordMetadataAndOffset.offsetAndMetadata.expireTimestamp match { case None => // current version with no per partition retention currentTimestamp - baseTimestamp(commitRecordMetadataAndOffset) >= offsetRetentionMs case Some(expireTimestamp) => // older versions with explicit expire_timestamp field => old expiration semantics is used currentTimestamp >= expireTimestamp } } } } {code} The expireTimestamp in the on-disk offset record can only be set when storing the committed offset in the __consumer_offset topic. But the GroupCoordinator also has keep a in-memory representation for the expireTimestamp (see the codes above), which can be set in the following two cases: # Upon the GroupCoordinator receiving OffsetCommitRequest, the expireTimestamp is set using the following logic: {code:java} expireTimestamp = offsetCommitRequest.retentionTime match { case OffsetCommitRequest.DEFAULT_RETENTION_TIME => None case retentionTime => Some(currentTimestamp + retentionTime) } {code} In all the latest client versions, the consumer will set out OffsetCommitRequest with DEFAULT_RETENTION_TIME so the expireTimestamp will always be None in this case. *This means any committed offset set in this case will always hit the "case None" in the "getExpiredOffsets(...)" when coordinator is doing the cleanup, which is correct.* # Upon the GroupCoordinatorReceiving loading the committed offset stored in the __consumer_offsets topic from disk, the expireTimestamp is set using the following logic if IBP<2.1: {code:java} val expireTimestamp = value.get(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1).asInstanceOf[Long] {code} and the logic to persist the expireTimestamp is: {code:java} // OffsetCommitRequest.DEFAULT_TIMESTAMP = -1 value.set(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1, offsetAndMetadata.expireTimestamp.getOrElse(OffsetCommitRequest.DEFAULT_TIMESTAMP)) {code} Since the in-memory expireTimestamp will always be None in our case as mentioned in 1), we will always store -1 on-disk. Therefore, when the offset is loaded from the __consumer_offsets topic, the in-memory expireTimestamp will always be set to -1. *This means any committed offset set in this case will always hit "case Some(expireTimestamp)" in the "getExpiredOffsets(...)" when coordinator is doing the cleanup, which basically indicates we will always expire the committed offset on the first expiration check (which is shortly after they are loaded from __consumer_offsets topic)*. I am able to reproduce this bug on my local box with one broker using 2.*,1.* and 0.11.* consumer. The consumer will see null committed offset after the broker is bounced. This bug is introduced by [PR-5690|https://github.com/apache/kafka/pull/5690] in the kafka 2.1 release and the fix is very straight-forward, which is basically set the expireTimestamp to None if it is -1 in the on-disk format. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7854) Behavior change in controller picking up partition reassignment tasks since 1.1.0
Zhanxiang (Patrick) Huang created KAFKA-7854: Summary: Behavior change in controller picking up partition reassignment tasks since 1.1.0 Key: KAFKA-7854 URL: https://issues.apache.org/jira/browse/KAFKA-7854 Project: Kafka Issue Type: Improvement Components: controller Reporter: Zhanxiang (Patrick) Huang After [https://github.com/apache/kafka/pull/4143,] the controller does not subscribe to data change on /admin/reassign_partitions any more (in order to avoid unnecessarily loading the reassignment data again after controller updating the znode) as opposed to the previous kafka versions. However, there are systems built around kafka relying on the previous behavior to incrementally update the list of partition reassignment since kafka does not natively support that. For example, [cruise control|https://github.com/linkedin/cruise-control] can rely on the previous behavior (controller listening to data changes) to maintain the reassignment concurrency by dynamically updating the data in the reassignment znode instead of waiting for the current batch to finish and doing reassignment batch by batch, which can significantly reduce the rebalance time in production clusters. Although directly updating the znode can somehow be viewed as an anti-pattern in the long term, this is necessary since kafka does not natively support incrementally submit more reassignment tasks. However, after our kafka clusters migrate from 0.11 to 2.0, cruise control no longer works because the controller behavior has changed. This reveals the following problems: * These behavior changes may be viewed as internal changes so compatibility is not guaranteed but I think by convention people do view this as public interfaces and rely on the compatibility. In this case, I think we should clearly document the data contract for the partition reassignment task to avoid misusage and making controller changes that break the defined data contract. There may be other cases (e.g. topic deletion) whose data contracts need to be clearly defined and we should keep it in mind when making controller changes. * Kafka does not natively support incrementally submit more reassignment tasks. If we do want to support that nicely, we should consider change how we store the reassignment data to store the data in child nodes and let the controller listen on child node changes, similar to what we do for /admin/delete_topics. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-7452) Deleting snapshot files after check-pointing log recovery offsets can slow down replication when truncation happens
[ https://issues.apache.org/jira/browse/KAFKA-7452?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhanxiang (Patrick) Huang resolved KAFKA-7452. -- Resolution: Duplicate KAFKA-7557 fixed this. > Deleting snapshot files after check-pointing log recovery offsets can slow > down replication when truncation happens > --- > > Key: KAFKA-7452 > URL: https://issues.apache.org/jira/browse/KAFKA-7452 > Project: Kafka > Issue Type: Bug >Affects Versions: 1.0.0, 1.0.1, 1.1.0, 1.1.1, 2.0.0 >Reporter: Zhanxiang (Patrick) Huang >Assignee: Zhanxiang (Patrick) Huang >Priority: Major > > After KAFKA-5829, Kafka will try to iterate through all the partition dirs to > delete useless snapshot files in "checkpointLogRecoveryOffsetsInDir". > Currently, "checkpointLogRecoveryOffsetsInDir" is used in the following > places: > # Truncation > # Log dir deletion and movement > # Background thread checkpointing recovery offsets > In 2.0 deployment on a cluster with 10k partitions per broker, we found out > that deleting useless snapshot files in the critical path of log truncation > can significantly slow down followers to catch up with leader during rolling > bounce (~2x slower than 0.11). The reason is that we basically do a "ls -R" > for the whole data directory only to potentially delete the snapshot files in > one partition directory because the way we identify snapshot files is to list > the directories and check the filename suffix. > In our case, "listSnapshotFiles" takes ~1ms per partition directory so it > takes ~1ms * 10k = ~10s to just delete snapshot files in one partition after > the truncation, which delays future fetches in the fetcher thread. > Here are the related code snippets: > LogManager.scala > > {code:java} > private def checkpointLogRecoveryOffsetsInDir(dir: File): Unit = { > for { > partitionToLog <- logsByDir.get(dir.getAbsolutePath) > checkpoint <- recoveryPointCheckpoints.get(dir) > } { > try { > checkpoint.write(partitionToLog.mapValues(_.recoveryPoint)) > allLogs.foreach(_.deleteSnapshotsAfterRecoveryPointCheckpoint()) > } catch { > case e: IOException => > logDirFailureChannel.maybeAddOfflineLogDir(dir.getAbsolutePath, > s"Disk error while writing to recovery point " + > s"file in directory $dir", e) > } > } > } > {code} > > ProducerStateChangeManager.scala > > {code:java} > private[log] def listSnapshotFiles(dir: File): Seq[File] = { > if (dir.exists && dir.isDirectory) { > Option(dir.listFiles).map { files => > files.filter(f => f.isFile && isSnapshotFile(f)).toSeq > }.getOrElse(Seq.empty) > } else Seq.empty > } > private def deleteSnapshotFiles(dir: File, predicate: Long => Boolean = _ => > true) { > listSnapshotFiles(dir).filter(file => > predicate(offsetFromFile(file))).foreach { file => > Files.deleteIfExists(file.toPath) > } > } > {code} > > There are a few things that can be optimized here: > # We can have an in-memory cache for the snapshot files metadata (filename) > in ProducerStateManager to avoid calling dir.listFiles in > "deleteSnapshotFiles", "latestSnapshotFile" and "oldestSnapshotFile". > # After truncation, we can only try to delete snapshot files for the > truncated partitions (in replica fetcher thread, we truncate one partition at > a time) instead of all partitions. Or maybe we don't even need to delete > snapshot files in the critical path of truncation because the background > log-recovery-offset-checkpoint-thread will do it periodically. This can also > apply on log deletion/movement. > # If we want to further optimize the actual snapshot file deletion, we can > make it async. But I am not sure whether it is needed after we have 1) and 2). > Also, we notice that there is no way to disable transaction/exactly-once > support in the broker-side given that it will bring in some extra overhead > even though we have no clients using this feature. Not sure whether this is a > common use case, but it is useful if we can have a switch to avoid the extra > performance overhead. > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7537) Only include live brokers in the UpdateMetadataRequest sent to existing brokers if there is no change in the partition states
Zhanxiang (Patrick) Huang created KAFKA-7537: Summary: Only include live brokers in the UpdateMetadataRequest sent to existing brokers if there is no change in the partition states Key: KAFKA-7537 URL: https://issues.apache.org/jira/browse/KAFKA-7537 Project: Kafka Issue Type: Improvement Components: controller Reporter: Zhanxiang (Patrick) Huang Assignee: Zhanxiang (Patrick) Huang Currently if when brokers join/leave the cluster without any partition states changes, controller will send out UpdateMetadataRequests containing the states of all partitions to all brokers. But for existing brokers in the cluster, the metadata diff between controller and the broker should only be the "live_brokers" info. Only the brokers with empty metadata cache need the full UpdateMetadataRequest. Sending the full UpdateMetadataRequest to all brokers can place nonnegligible memory pressure on the controller side. Let's say in total we have N brokers, M partitions in the cluster and we want to add 1 brand new broker in the cluster. With RF=2, the memory footprint per partition in the UpdateMetadataRequest is ~200 Bytes. In the current controller implementation, if each of the N RequestSendThreads serializes and sends out the UpdateMetadataRequest at roughly the same time (which is very likely the case), we will end up using *(N+1)*M*200B*. In a large kafka cluster, we can have: {noformat} N=99 M=100k Memory usage to send out UpdateMetadataRequest to all brokers: 100 * 100K * 200B = 2G However, we only need to send out full UpdateMetadataRequest to the newly added broker. We only need to include live broker ids (4B * 100 brokers) in the UpdateMetadataRequest sent to the existing 99 brokers. So the amount of data that is actully needed will be: 1 * 100K * 200B + 99 * (100 * 4B) = ~21M We will can potentially reduce 2G / 21M = ~95x memory footprint as well as the data tranferred in the network.{noformat} This issue kind of hurts the scalability of a kafka cluster. KIP-380 and KAFKA-7186 also help to further reduce the controller memory footprint. In terms of implementation, we can keep some in-memory state in the controller side to differentiate existing brokers and uninitialized brokers (e.g. brand new brokers) so that if there is no change in partition states, we only send out live brokers info to existing brokers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7464) Fail to shutdown ReplicaManager during broker cleaned shutdown
Zhanxiang (Patrick) Huang created KAFKA-7464: Summary: Fail to shutdown ReplicaManager during broker cleaned shutdown Key: KAFKA-7464 URL: https://issues.apache.org/jira/browse/KAFKA-7464 Project: Kafka Issue Type: Bug Reporter: Zhanxiang (Patrick) Huang Assignee: Zhanxiang (Patrick) Huang In 2.0 deployment, we saw the following log when shutting down the ReplicaManager in broker cleaned shutdown: {noformat} 2018/09/27 08:22:18.699 WARN [CoreUtils$] [Thread-1] [kafka-server] [] null java.lang.IllegalArgumentException: null at java.nio.Buffer.position(Buffer.java:244) ~[?:1.8.0_121] at sun.nio.ch.IOUtil.write(IOUtil.java:68) ~[?:1.8.0_121] at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) ~[?:1.8.0_121] at org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:214) ~[kafka-clients-2.0.0.22.jar:?] at org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:164) ~[kafka-clients-2.0.0.22.jar:?] at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:806) ~[kafka-clients-2.0.0.22.jar:?] at org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:107) ~[kafka-clients-2.0.0.22.jar:?] at org.apache.kafka.common.network.Selector.doClose(Selector.java:751) ~[kafka-clients-2.0.0.22.jar:?] at org.apache.kafka.common.network.Selector.close(Selector.java:739) ~[kafka-clients-2.0.0.22.jar:?] at org.apache.kafka.common.network.Selector.close(Selector.java:701) ~[kafka-clients-2.0.0.22.jar:?] at org.apache.kafka.common.network.Selector.close(Selector.java:315) ~[kafka-clients-2.0.0.22.jar:?] at org.apache.kafka.clients.NetworkClient.close(NetworkClient.java:595) ~[kafka-clients-2.0.0.22.jar:?] at kafka.server.ReplicaFetcherBlockingSend.close(ReplicaFetcherBlockingSend.scala:107) ~[kafka_2.11-2.0.0.22.jar:?] at kafka.server.ReplicaFetcherThread.initiateShutdown(ReplicaFetcherThread.scala:108) ~[kafka_2.11-2.0.0.22.jar:?] at kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:183) ~[kafka_2.11-2.0.0.22.jar:?] at kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:182) ~[kafka_2.11-2.0.0.22.jar:?] at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733) ~[scala-library-2.11.12.jar:?] at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130) ~[scala-library-2.11.12.jar:?] at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130) ~[scala-library-2.11.12.jar:?] at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236) ~[scala-library-2.11.12.jar:?] at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40) ~[scala-library-2.11.12.jar:?] at scala.collection.mutable.HashMap.foreach(HashMap.scala:130) ~[scala-library-2.11.12.jar:?] at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732) ~[scala-library-2.11.12.jar:?] at kafka.server.AbstractFetcherManager.closeAllFetchers(AbstractFetcherManager.scala:182) ~[kafka_2.11-2.0.0.22.jar:?] at kafka.server.ReplicaFetcherManager.shutdown(ReplicaFetcherManager.scala:37) ~[kafka_2.11-2.0.0.22.jar:?] at kafka.server.ReplicaManager.shutdown(ReplicaManager.scala:1471) ~[kafka_2.11-2.0.0.22.jar:?] at kafka.server.KafkaServer$$anonfun$shutdown$12.apply$mcV$sp(KafkaServer.scala:616) ~[kafka_2.11-2.0.0.22.jar:?] at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:86) ~[kafka_2.11-2.0.0.22.jar:?] at kafka.server.KafkaServer.shutdown(KafkaServer.scala:616) ~[kafka_2.11-2.0.0.22.jar:?] {noformat} After that, we noticed that some of the replica fetcher thread fail to shutdown: {noformat} 2018/09/27 08:22:46.176 ERROR [LogDirFailureChannel] [ReplicaFetcherThread-26-13085] [kafka-server] [] Error while rolling log segment for video-social-gestures-30 in dir /export/content/kafka/i001_caches java.nio.channels.ClosedChannelException: null at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:110) ~[?:1.8.0_121] at sun.nio.ch.FileChannelImpl.size(FileChannelImpl.java:300) ~[?:1.8.0_121] at org.apache.kafka.common.record.FileRecords.truncateTo(FileRecords.java:244) ~[kafka-clients-2.0.0.22.jar:?] at org.apache.kafka.common.record.FileRecords.trim(FileRecords.java:206) ~[kafka-clients-2.0.0.22.jar:?] at kafka.log.LogSegment.onBecomeInactiveSegment(LogSegment.scala:512) ~[kafka_2.11-2.0.0.22.jar:?] at kafka.log.Log$$anonfun$roll$2$$anonfun$apply$30.apply(Log.scala:1493) ~[kafka_2.11-2.0.0.22.jar:?] at
[jira] [Created] (KAFKA-7459) Concurrency bug in updating RequestsPerSec metric
Zhanxiang (Patrick) Huang created KAFKA-7459: Summary: Concurrency bug in updating RequestsPerSec metric Key: KAFKA-7459 URL: https://issues.apache.org/jira/browse/KAFKA-7459 Project: Kafka Issue Type: Bug Affects Versions: 2.0.0, 2.0.1 Reporter: Zhanxiang (Patrick) Huang Assignee: Zhanxiang (Patrick) Huang After KAFKA-6514, we add API version as a tag for the RequestsPerSec metric but in the implementation, we use the non-threadsafe mutable.HashMap to store the version -> metric mapping without any protection ([https://github.com/apache/kafka/pull/4506/files#diff-d0332a0ff31df50afce3809d90505b25R357|https://github.com/apache/kafka/pull/4506/files#diff-d0332a0ff31df50afce3809d90505b25R357).] ). This can mess up the data structure and cause unexpected behavior ([https://github.com/scala/bug/issues/10436|https://github.com/scala/bug/issues/10436).] ). We should use ConcurrentHashMap instead. In our case, clean shutdown a 2.0 broker takes forever because of this concurrency bug leading to an infinite loop in HapMap resize. Thread-1 is doing the clean shutdown but stuck on waiting for one of the network thread to shutdown: {noformat} "Thread-1" #25 prio=5 os_prio=0 tid=0x7f05c401 nid=0x79f4 waiting on condition [0x7f0597cfb000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x0001ffad1500> (a java.util.concurrent.CountDownLatch$Sync) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) at kafka.network.AbstractServerThread.shutdown(SocketServer.scala:282) at kafka.network.Processor.shutdown(SocketServer.scala:873) at kafka.network.Acceptor$$anonfun$shutdown$3.apply(SocketServer.scala:368) at kafka.network.Acceptor$$anonfun$shutdown$3.apply(SocketServer.scala:368) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at kafka.network.Acceptor.shutdown(SocketServer.scala:368) - locked <0x0001fdcf1000> (a kafka.network.Acceptor) at kafka.network.SocketServer$$anonfun$stopProcessingRequests$2.apply(SocketServer.scala:178) at kafka.network.SocketServer$$anonfun$stopProcessingRequests$2.apply(SocketServer.scala:178) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206) at kafka.network.SocketServer.stopProcessingRequests(SocketServer.scala:178) - locked <0x0001fba0e610> (a kafka.network.SocketServer) at kafka.server.KafkaServer$$anonfun$shutdown$3.apply$mcV$sp(KafkaServer.scala:595) at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:86) at kafka.server.KafkaServer.shutdown(KafkaServer.scala:595){noformat} The network thread is always in HashTable.resize and never finishes updateRequestMetrics: {noformat} "kafka-network-thread-13673-ListenerName(SSL)-SSL-2" #201 prio=5 os_prio=0 tid=0x7f441dae4000 nid=0x4cdc runnable [0x7f2404189000] java.lang.Thread.State: RUNNABLE at scala.collection.mutable.HashTable$class.resize(HashTable.scala:268) at scala.collection.mutable.HashTable$class.scala$collection$mutable$HashTable$$addEntry0(HashTable.scala:157) at scala.collection.mutable.HashTable$class.addEntry(HashTable.scala:148) at scala.collection.mutable.HashMap.addEntry(HashMap.scala:40) at scala.collection.mutable.HashMap.addEntry(HashMap.scala:93) at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:79) at kafka.network.RequestMetrics.requestRate(RequestChannel.scala:438) at kafka.network.RequestChannel$Request$$anonfun$updateRequestMetrics$1.apply(RequestChannel.scala:161) at kafka.network.RequestChannel$Request$$anonfun$updateRequestMetrics$1.apply(RequestChannel.scala:159) at scala.collection.immutable.List.foreach(List.scala:392) at kafka.network.RequestChannel$Request.updateRequestMetrics(RequestChannel.scala:159) at kafka.network.Processor.kafka$network$Processor$$updateRequestMetrics(SocketServer.scala:740) at
[jira] [Created] (KAFKA-7452) Deleting snapshot files after check-pointing log recovery offsets can slow down replication when truncation happens
Zhanxiang (Patrick) Huang created KAFKA-7452: Summary: Deleting snapshot files after check-pointing log recovery offsets can slow down replication when truncation happens Key: KAFKA-7452 URL: https://issues.apache.org/jira/browse/KAFKA-7452 Project: Kafka Issue Type: Bug Affects Versions: 2.0.0, 1.1.1, 1.1.0, 1.0.1, 1.0.0 Reporter: Zhanxiang (Patrick) Huang After KAFKA-5829, Kafka will try to iterate through all the partition dirs to delete useless snapshot files in "checkpointLogRecoveryOffsetsInDir". Currently, "checkpointLogRecoveryOffsetsInDir" is used in the following places: # Truncation # Log dir deletion and movement # Background thread checkpointing recovery offsets In 2.0 deployment on a cluster with 10k partitions per broker, we found out that deleting useless snapshot files in the critical path of log truncation can significantly slow down followers to catch up with leader during rolling bounce (~2x slower than 0.11). The reason is that we basically do a "ls -R" for the whole data directory only to potentially delete the snapshot files in one partition directory because the way we identify snapshot files is to list the directories and check the filename suffix. In our case, "listSnapshotFiles" takes ~1ms per partition directory so it takes ~1ms * 10k = ~10s to just delete snapshot files in one partition after the truncation, which delays future fetches in the fetcher thread. Here are the related code snippets: LogManager.scala {code:java} private def checkpointLogRecoveryOffsetsInDir(dir: File): Unit = { for { partitionToLog <- logsByDir.get(dir.getAbsolutePath) checkpoint <- recoveryPointCheckpoints.get(dir) } { try { checkpoint.write(partitionToLog.mapValues(_.recoveryPoint)) allLogs.foreach(_.deleteSnapshotsAfterRecoveryPointCheckpoint()) } catch { case e: IOException => logDirFailureChannel.maybeAddOfflineLogDir(dir.getAbsolutePath, s"Disk error while writing to recovery point " + s"file in directory $dir", e) } } } {code} ProducerStateChangeManager.scala {code:java} private[log] def listSnapshotFiles(dir: File): Seq[File] = { if (dir.exists && dir.isDirectory) { Option(dir.listFiles).map { files => files.filter(f => f.isFile && isSnapshotFile(f)).toSeq }.getOrElse(Seq.empty) } else Seq.empty } private def deleteSnapshotFiles(dir: File, predicate: Long => Boolean = _ => true) { listSnapshotFiles(dir).filter(file => predicate(offsetFromFile(file))).foreach { file => Files.deleteIfExists(file.toPath) } } {code} There are a few things that can be optimized here: # We can have an in-memory cache for the snapshot files metadata (filename) in ProducerStateManager to avoid calling dir.listFiles in "deleteSnapshotFiles", "latestSnapshotFile" and "oldestSnapshotFile". # After truncation, we can only try to delete snapshot files for the truncated partitions (in replica fetcher thread, we truncate one partition at a time) instead of all partitions. Or maybe we don't even need to delete snapshot files in the critical path of truncation because the background log-recovery-offset-checkpoint-thread will do it periodically. This can also apply on log deletion/movement. # If we want to further optimize the actual snapshot file deletion, we can make it async. But I am not sure whether it is needed after we have 1) and 2). Also, we notice that there is no way to disable transaction/exactly-once support in the broker-side given that it will bring in some extra overhead even though we have no clients using this feature. Not sure whether this is a common use case, but it is useful if we can have a switch to avoid the extra performance overhead. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7283) mmap indexes lazily and skip sanity check for segments below recovery point
Zhanxiang (Patrick) Huang created KAFKA-7283: Summary: mmap indexes lazily and skip sanity check for segments below recovery point Key: KAFKA-7283 URL: https://issues.apache.org/jira/browse/KAFKA-7283 Project: Kafka Issue Type: New Feature Reporter: Zhanxiang (Patrick) Huang Assignee: Zhanxiang (Patrick) Huang This is a follow-up ticket for KIP-263. Currently broker will mmap the index files, read the length as well as the last entry of the file, and sanity check index files of all log segments in the log directory after the broker is started. These operations can be slow because broker needs to open index file and read data into page cache. In this case, the time to restart a broker will increase proportional to the number of segments in the log directory. Per the KIP discussion, we think we can skip sanity check for segments below the recovery point since Kafka does not provide guarantee for segments already flushed to disk and sanity checking only index file benefits little when the segment is also corrupted because of disk failure. Therefore, we can make the following changes to improve broker startup time: # Mmap the index file and populate fields of the index file on-demand rather than performing costly disk operations when creating the index object on broker startup. # Skip sanity checks on indexes of segments below the recovery point. With these changes, the broker startup time will increase only proportional to the number of partitions in the log directly after cleaned shutdown because only active segments are mmaped and sanity checked. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7186) Controller uses too much memory when sending out UpdateMetadataRequest that can cause OutOfMemoryError
Zhanxiang (Patrick) Huang created KAFKA-7186: Summary: Controller uses too much memory when sending out UpdateMetadataRequest that can cause OutOfMemoryError Key: KAFKA-7186 URL: https://issues.apache.org/jira/browse/KAFKA-7186 Project: Kafka Issue Type: Bug Components: controller Reporter: Zhanxiang (Patrick) Huang Assignee: Zhanxiang (Patrick) Huang During controller failover and broker changes, it sends out UpdateMetadataRequest to all brokers in the cluster containing the states for all partitions and live brokers. The current implementation will instantiate the UpdateMetadataRequest object and its serialized form (Struct) for <# of brokers> times, which causes OOM if the memory exceeds the configure JVM heap size. We have seen this issue in the production environment for multiple times. For example, if we have 100 brokers in the cluster and each broker is the leader of 2k partitions, the extra memory usage introduced by controller trying to send out UpdateMetadataRequest is around: * <# of brokers> * = 250B * 100 * 200k = 5GB -- This message was sent by Atlassian JIRA (v7.6.3#76005)