[jira] [Created] (KAFKA-15339) Transient I/O error happening in appending records could lead to the half of whole cluster
Haoze Wu created KAFKA-15339: Summary: Transient I/O error happening in appending records could lead to the half of whole cluster Key: KAFKA-15339 URL: https://issues.apache.org/jira/browse/KAFKA-15339 Project: Kafka Issue Type: Improvement Components: connect, producer Affects Versions: 3.5.0 Reporter: Haoze Wu We are running an integration test in which we start an Embedded Connect Cluster in the active 3.5 branch. However, because of transient disk error, we may encounter an IOException during appending records to one topic. As shown in the stack trace: {code:java} [2023-08-13 16:53:51,016] ERROR Error while appending records to connect-config-topic-connect-cluster-0 in dir /tmp/EmbeddedKafkaCluster8003464883598783225 (org.apache.kafka.storage.internals.log.LogDirFailureChannel:61) java.io.IOException: at org.apache.kafka.common.record.MemoryRecords.writeFullyTo(MemoryRecords.java:92) at org.apache.kafka.common.record.FileRecords.append(FileRecords.java:188) at kafka.log.LogSegment.append(LogSegment.scala:161) at kafka.log.LocalLog.append(LocalLog.scala:436) at kafka.log.UnifiedLog.append(UnifiedLog.scala:853) at kafka.log.UnifiedLog.appendAsLeader(UnifiedLog.scala:664) at kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:1281) at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:1269) at kafka.server.ReplicaManager.$anonfun$appendToLocalLog$6(ReplicaManager.scala:977) at scala.collection.StrictOptimizedMapOps.map(StrictOptimizedMapOps.scala:28) at scala.collection.StrictOptimizedMapOps.map$(StrictOptimizedMapOps.scala:27) at scala.collection.mutable.HashMap.map(HashMap.scala:35) at kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:965) at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:623) at kafka.server.KafkaApis.handleProduceRequest(KafkaApis.scala:680) at kafka.server.KafkaApis.handle(KafkaApis.scala:180) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:76) at java.lang.Thread.run(Thread.java:748) {code} However, just because of failing to append the records to one partition. The fetcher for all the other partitions are removed, broker shutdown, and finally embedded connect cluster killed as whole. {code:java} [2023-08-13 17:35:37,966] WARN Stopping serving logs in dir /tmp/EmbeddedKafkaCluster6777164631574762227 (kafka.log.LogManager:70) [2023-08-13 17:35:37,968] ERROR Shutdown broker because all log dirs in /tmp/EmbeddedKafkaCluster6777164631574762227 have failed (kafka.log.LogManager:143) [2023-08-13 17:35:37,968] WARN Abrupt service halt with code 1 and message null (org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster:130) [2023-08-13 17:35:37,968] ERROR [LogDirFailureHandler]: Error due to (kafka.server.ReplicaManager$LogDirFailureHandler:135) org.apache.kafka.connect.util.clusters.UngracefulShutdownException: Abrupt service halt with code 1 and message null {code} I am wondering if we could add configurable retry around the root cause to tolerate the possible I/O faults so that if the retry is successful, the embedded connect cluster could still operate. Any comments and suggestions would be appreciated. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14886) Broker request handler thread pool is full due to single request slowdown
Haoze Wu created KAFKA-14886: Summary: Broker request handler thread pool is full due to single request slowdown Key: KAFKA-14886 URL: https://issues.apache.org/jira/browse/KAFKA-14886 Project: Kafka Issue Type: Improvement Affects Versions: 2.8.0 Reporter: Haoze Wu In Kafka-2.8.0, we found that the number of data plane Kafka request handlers may quickly reach the limit when only one request is stuck. As a result, all other requests that require a data plane request handler will be stuck. When there is a slowdown inside the storeOffsets function at line 777 due to I/O operation, the thread holds the lock acquired at line 754. {code:java} private def doCommitOffsets(group: GroupMetadata, memberId: String, groupInstanceId: Option[String], generationId: Int, offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata], responseCallback: immutable.Map[TopicPartition, Errors] => Unit): Unit = { group.inLock { // Line 754 .. groupManager.storeOffsets() // Line 777 .. } } {code} Its call stack is: {code:java} kafka.coordinator.group.GroupMetadata,inLock,227 kafka.coordinator.group.GroupCoordinator,handleCommitOffsets,755 kafka.server.KafkaApis,handleOffsetCommitRequest,515 kafka.server.KafkaApis,handle,175 kafka.server.KafkaRequestHandler,run,74 java.lang.Thread,run,748 {code} This happens when the broker is handling the commit offset request from the consumer. When the slowdown mentioned above makes consumers get no response back, the consumer will automatically resend the request to the broker. Note that each request from the consumer is handled by a data-plane-kafka-request-handler thread. Therefore, another data-plane-kafka-request-handler thread will be also stuck at line 754 when handling the retry requests, because it tries to acquire the very same lock of the consumer group. The retry will occur repeatedly, and none of them can succeed. As a result, the pool of data-plane-kafka-request-handler threads will be full. Note that this pool of threads is responsible for handling all such requests from all producers and consumers. As a result, all the producers and consumers would be affected. However, the backoff mechanism might be able to solve this issue, by reducing the number of requests in a short time and reserving more slots in the thread pool. Therefore, we increase the backoff config “retry-backoff-ms”, to see if the issue disappears. Specifically, we increase the retry backoff from 100ms (default) to 1000ms in consumer’s config. However, we found that the mentioned thread pool is full again, because there are multiple heartbeat requests that take up the slots of this thread pool. All those heartbeat request handling is stuck when they are acquiring the same consumer group lock, which has been acquired at line 754 as mentioned. Specifically, the heartbeat handling is stuck at GroupCoordinator.handleHeartbeat@624: {code:java} def handleHeartbeat(groupId: String, memberId: String, groupInstanceId: Option[String], generationId: Int, responseCallback: Errors => Unit): Unit = { .. case Some(group) => group.inLock { // Line 624 .. } .. } {code} The heartbeat requests are sent at the interval of 3000ms (by default) from the consumer. It has no backoff mechanism. The thread pool for data-plane-kafka-request-handler will be full soon. Fix: Instead of waiting for the lock, we can just try to acquire the lock (probably with a time limit). If the acquisition fails, this request can be discarded so that other requests (which include the retry of the discarded one) can be processed. However, we feel this fix would affect the semantic of many operations. We would like to hear some suggestions from the community. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14882) Uncoordinated states about topic in ZooKeeper nodes and Kafka brokers cause TopicExistException at client
Haoze Wu created KAFKA-14882: Summary: Uncoordinated states about topic in ZooKeeper nodes and Kafka brokers cause TopicExistException at client Key: KAFKA-14882 URL: https://issues.apache.org/jira/browse/KAFKA-14882 Project: Kafka Issue Type: Improvement Affects Versions: 2.8.0 Reporter: Haoze Wu We have been doing testing on Kafka-2.8.0. We found some scenarios where TopicExistException happens and we feel the design of the topic create process in Kafka may confuse the users sometimes. When a user uses a client which sends a topic create request to a Kafka broker, and the following steps will happen: # AdminManager check topic path in zkNodes and throw TopicExistException if the topic exists (Kafka sends request to ZooKeeper) # AdminManager add topic path in zkNodes (Kafka sends request to ZooKeeper) # Controller’s ZookperRequestWatcher detect it and put the corresponding event (ZooKeeper Watcher sends message to Kafka) # Event kicked out of queue and get executed (Kafka broker (controller) sends LeaderAndIsrRequest to Kafka broker (may include itself)) # Broker handles the request and back to step #1 A symptom we observed is that when step #4 has some delay (stuck for some reason) and then the client may retry (send the topic create request again), which triggers TopicExistException in step #1. However, The topic create request should occur as kind of “transaction”. It should have some atomicity and also be robust under concurrent topic creation. After some inspection, we found that it is not easy for us to implement such feature to the Kafka given the current implementation. But we do have the complaint that the user client gets TopicExistException when the topic is not actually existing or ready. We suggest that maybe we can at least have some utility which help users mitigate this issue. For example, provide a tool which help users clean the ZooKeeper data and make sure the consistency of the topic metadata. We are waiting for some feedbacks from the community. We can provided some concrete cases and reproduction scripts and analysis of the workload if needed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-13538) Unexpected TopicExistsException related to Admin#createTopics after broker crash
Haoze Wu created KAFKA-13538: Summary: Unexpected TopicExistsException related to Admin#createTopics after broker crash Key: KAFKA-13538 URL: https://issues.apache.org/jira/browse/KAFKA-13538 Project: Kafka Issue Type: Bug Components: build Affects Versions: 2.8.0 Reporter: Haoze Wu We were using the official Kafka Java API to create a topic in a Kafka broker cluster (3 brokers): {code:java} CreateTopicsResult result = admin.createTopics(...); ... = result.all().get(); {code} The topic we create always has replication factor = 2, and partition = 2. If one of the brokers crashes for some reason and the client tries to create a topic exactly in this crashed broker, we usually observe that the client may suffer from a delay of a few seconds due to the disconnection issue, and then the client automatically connects to another broker and creates the topic in this broker. Everything is done automatically in the client, under the code of `admin.createTopics(...)` and `result.all().get()`. However, we found that sometimes we got `TopicExistsException` from `result.all().get()`, but we had never created this topic beforehand. After some investigation on the source code of client, we found that this issue happens in this way: # The client connects to a broker (say, broker X) and then sends the topic creation request. # This topic has replication factor = 2 and partition = 2, so broker X may inform another broker of this information. # Broker X suddenly crashes for some reason, and the response for the topic creation request has not been sent back to the client. # The client eventually learns that broker X crashes, but never gets the response for the topic creation request. Thus the client thinks the topic creation request fails, and thus connects to another broker (say, broker Y) and then sends the topic creation request again. # This topic creation request (with replication factor = 2 and partition = 2) had been partially executed before broker X crashes, so broker Y may have done something required by broker X. For example, broker Y has some metadata about this topic. Therefore, when Broker Y does some sanity check with the metadata, it will find this topic exists, so broker Y directly returns `TopicExistsException` as the response. # The client receives `TopicExistsException`, and directly believes that this topic has been created, so it is thrown back to the user with the API `result.all().get()`. There are 2 diagrams illustrating these 6 steps: Now the core question is whether this workflow violates the semantic & design of the Kafka Client API. We read the “Create Topics Response” section in KIP-4 ([https://cwiki.apache.org/confluence/display/kafka/kip-4+-+command+line+and+centralized+administrative+operations]). We found that the description in KIP-4 focuses on the batch request of topic creations and how they work independently. It does not talk about how the client should deal with the aforementioned buggy scenario. According to “common sense”, we think the client should be able to know that the metadata existing in broker Y is actually created by the client via the crashed broker X. Also, the client should not throw `TopicExistsException` to the user. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (KAFKA-13468) Consumers may hang because IOException in Log# does not trigger KafkaStorageException
Haoze Wu created KAFKA-13468: Summary: Consumers may hang because IOException in Log# does not trigger KafkaStorageException Key: KAFKA-13468 URL: https://issues.apache.org/jira/browse/KAFKA-13468 Project: Kafka Issue Type: Bug Components: log Affects Versions: 2.8.0 Reporter: Haoze Wu When the Kafka Log class (`core/src/main/scala/kafka/log/Log.scala`) is initialized, it may encounter an IO exception in the locally block, e.g., when the log directory cannot be created due to permission issue or IOException in `initializeLeaderEpochCache`, `initializePartitionMetadata`, etc. {code:java} class Log(...) { // ... locally { // create the log directory if it doesn't exist Files.createDirectories(dir.toPath) initializeLeaderEpochCache() initializePartitionMetadata() val nextOffset = loadSegments() // ... } // ... }{code} We found that the broker encountering the IO exception prints an KafkaApi error log like the following and proceeds. {code:java} [2021-11-17 22:41:30,057] ERROR [KafkaApi-1] Error when handling request: clientId=1, correlationId=1, api=LEADER_AND_ISR, version=5, body=LeaderAndIsrRequestData(controllerId=1, controllerEpoch=1, brokerEpoch=4294967362, type=0, ungroupedPartitionStates=[], topicStates=[LeaderAndIsrTopicState(topicName='gray-2-0', topicId=573bAVHfRQeXApzAKevNIg, partitionStates=[LeaderAndIsrPartitionState(topicName='gray-2-0', partitionIndex=1, controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1, 3], zkVersion=0, replicas=[1, 3], addingReplicas=[], removingReplicas=[], isNew=true)]), LeaderAndIsrTopicState(topicName='gray-1-0', topicId=12dW2FxLTiyKmGi41HhdZQ, partitionStates=[LeaderAndIsrPartitionState(topicName='gray-1-0', partitionIndex=1, controllerEpoch=1, leader=3, leaderEpoch=0, isr=[3, 1], zkVersion=0, replicas=[3, 1], addingReplicas=[], removingReplicas=[], isNew=true)]), LeaderAndIsrTopicState(topicName='gray-3-0', topicId=_yvmANyZSoK_PTV0e-nqCA, partitionStates=[LeaderAndIsrPartitionState(topicName='gray-3-0', partitionIndex=1, controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1, 3], zkVersion=0, replicas=[1, 3], addingReplicas=[], removingReplicas=[], isNew=true)])], liveLeaders=[LeaderAndIsrLiveLeader(brokerId=1, hostName='localhost', port=9791), LeaderAndIsrLiveLeader(brokerId=3, hostName='localhost', port=9793)]) (kafka.server.RequestHandlerHelper) {code} But all the consumers that are consuming data from the affected topics (“gray-2-0”, “gray-1-0”, “gray-3-0”) are not able to proceed. These consumers don’t have any error log related to this issue. They hang for more than 3 minutes. The IOException sometimes affects multiple offset topics: {code:java} [2021-11-18 10:57:41,289] ERROR [KafkaApi-1] Error when handling request: clientId=1, correlationId=11, api=LEADER_AND_ISR, version=5, body=LeaderAndIsrRequestData(controllerId=1, controllerEpoch=1, brokerEpoch=4294967355, type=0, ungroupedPartitionStates=[], topicStates=[LeaderAndIsrTopicState(topicName='__consumer_offsets', topicId=_MiMTCViS76osIyDdxekIg, partitionStates=[LeaderAndIsrPartitionState(topicName='__consumer_offsets', partitionIndex=15, controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1], zkVersion=0, replicas=[1], addingReplicas=[], removingReplicas=[], isNew=true), LeaderAndIsrPartitionState(topicName='__consumer_offsets', partitionIndex=48, controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1], zkVersion=0, replicas=[1], addingReplicas=[], removingReplicas=[], isNew=true), LeaderAndIsrPartitionState(topicName='__consumer_offsets', partitionIndex=45, controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1], zkVersion=0, replicas=[1], addingReplicas=[], removingReplicas=[], isNew=true), ... addingReplicas=[], removingReplicas=[], isNew=true), LeaderAndIsrPartitionState(topicName='__consumer_offsets', partitionIndex=33, controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1], zkVersion=0, replicas=[1], addingReplicas=[], removingReplicas=[], isNew=true)])], liveLeaders=[LeaderAndIsrLiveLeader(brokerId=1, hostName='localhost', port=9791)]) (kafka.server.RequestHandlerHelper) {code} *Analysis* The key stacktrace is as follows: {code:java} "java.lang.Thread,run,748", "kafka.server.KafkaRequestHandler,run,74", "kafka.server.KafkaApis,handle,236", "kafka.server.KafkaApis,handleLeaderAndIsrRequest,258", "kafka.server.ReplicaManager,becomeLeaderOrFollower,1411", "kafka.server.ReplicaManager,makeLeaders,1566", "scala.collection.mutable.HashMap,foreachEntry,499", "scala.collection.mutable.HashMap$Node,foreachEntry,633", "kafka.utils.Implicits$MapExtensionMethods$,$anonfun$forKeyValue$1,62", "kafka.server.ReplicaManager,$anonfun$makeLeaders$5,1568", "kafka.cluster.Partition,makeLeader,548", "kafka.cluster.Partition,$anonfun$makeLeader$1,564", "kafka.cluster.Partition,createLogIfNotExists,324",
[jira] [Created] (KAFKA-13457) socketChannel in Acceptor#accept is not closed upon IOException
Haoze Wu created KAFKA-13457: Summary: socketChannel in Acceptor#accept is not closed upon IOException Key: KAFKA-13457 URL: https://issues.apache.org/jira/browse/KAFKA-13457 Project: Kafka Issue Type: Bug Components: network Affects Versions: 2.8.0 Reporter: Haoze Wu When the kafka.network.Acceptor in SocketServer.scala accepts a new connection in the `accept` function, it handles the `TooManyConnectionsException` and `ConnectionThrottledException`. However, line 717 or the socketChannel operations within the try block may potentially throw an IOException as well, which is not handled. {code:java} //core/src/main/scala/kafka/network/SocketServer.scala // Acceptor class private def accept(key: SelectionKey): Option[SocketChannel] = { val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel] val socketChannel = serverSocketChannel.accept() // line 717 try { connectionQuotas.inc(endPoint.listenerName, socketChannel.socket.getInetAddress, blockedPercentMeter) socketChannel.configureBlocking(false) socketChannel.socket().setTcpNoDelay(true) socketChannel.socket().setKeepAlive(true) if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE) socketChannel.socket().setSendBufferSize(sendBufferSize) Some(socketChannel) } catch { case e: TooManyConnectionsException => info(s"Rejected connection from ${e.ip}, address already has the configured maximum of ${e.count} connections.") close(endPoint.listenerName, socketChannel) None case e: ConnectionThrottledException => val ip = socketChannel.socket.getInetAddress debug(s"Delaying closing of connection from $ip for ${e.throttleTimeMs} ms") val endThrottleTimeMs = e.startThrottleTimeMs + e.throttleTimeMs throttledSockets += DelayedCloseSocket(socketChannel, endThrottleTimeMs) None } } {code} This thrown IOException is caught in the caller `acceptNewConnections` in line 706, which only prints an error message. The socketChannel that throws this IOException is not closed. {code:java} //core/src/main/scala/kafka/network/SocketServer.scala private def acceptNewConnections(): Unit = { val ready = nioSelector.select(500) if (ready > 0) { val keys = nioSelector.selectedKeys() val iter = keys.iterator() while (iter.hasNext && isRunning) { try { val key = iter.next iter.remove() if (key.isAcceptable) { accept(key).foreach { socketChannel => ... } while (!assignNewConnection(socketChannel, processor, retriesLeft == 0)) } } else throw new IllegalStateException("Unrecognized key state for acceptor thread.") } catch { case e: Throwable => error("Error while accepting connection", e) // line 706 } } } } {code} We found during testing this would cause our Kafka clients to experience errors (InvalidReplicationFactorException) for 40+ seconds when creating new topics. After 40 seconds, the clients would be able to create new topics successfully. We check that after adding the socketChannel.close() upon IOException, the symptoms will disappear, so the clients do not need to wait for 40s to be working again. -- This message was sent by Atlassian Jira (v8.20.1#820001)