[jira] [Created] (KAFKA-15339) Transient I/O error happening in appending records could lead to the half of whole cluster

2023-08-13 Thread Haoze Wu (Jira)
Haoze Wu created KAFKA-15339:


 Summary: Transient I/O error happening in appending records could 
lead to the half of whole cluster
 Key: KAFKA-15339
 URL: https://issues.apache.org/jira/browse/KAFKA-15339
 Project: Kafka
  Issue Type: Improvement
  Components: connect, producer 
Affects Versions: 3.5.0
Reporter: Haoze Wu


We are running an integration test in which we start an Embedded Connect 
Cluster in the active 3.5 branch. However, because of transient disk error, we 
may encounter an IOException during appending records to one topic. As shown in 
the stack trace: 
{code:java}
[2023-08-13 16:53:51,016] ERROR Error while appending records to 
connect-config-topic-connect-cluster-0 in dir 
/tmp/EmbeddedKafkaCluster8003464883598783225 
(org.apache.kafka.storage.internals.log.LogDirFailureChannel:61)
java.io.IOException: 
        at 
org.apache.kafka.common.record.MemoryRecords.writeFullyTo(MemoryRecords.java:92)
        at 
org.apache.kafka.common.record.FileRecords.append(FileRecords.java:188)
        at kafka.log.LogSegment.append(LogSegment.scala:161)
        at kafka.log.LocalLog.append(LocalLog.scala:436)
        at kafka.log.UnifiedLog.append(UnifiedLog.scala:853)
        at kafka.log.UnifiedLog.appendAsLeader(UnifiedLog.scala:664)
        at 
kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:1281)
        at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:1269)
        at 
kafka.server.ReplicaManager.$anonfun$appendToLocalLog$6(ReplicaManager.scala:977)
        at 
scala.collection.StrictOptimizedMapOps.map(StrictOptimizedMapOps.scala:28)
        at 
scala.collection.StrictOptimizedMapOps.map$(StrictOptimizedMapOps.scala:27)
        at scala.collection.mutable.HashMap.map(HashMap.scala:35)
        at 
kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:965)
        at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:623)
        at kafka.server.KafkaApis.handleProduceRequest(KafkaApis.scala:680)
        at kafka.server.KafkaApis.handle(KafkaApis.scala:180)
        at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:76)
        at java.lang.Thread.run(Thread.java:748) {code}
However, just because of failing to append the records to one partition. The 
fetcher for all the other partitions are removed, broker shutdown, and finally 
embedded connect cluster killed as whole. 
{code:java}
[2023-08-13 17:35:37,966] WARN Stopping serving logs in dir 
/tmp/EmbeddedKafkaCluster6777164631574762227 (kafka.log.LogManager:70)
[2023-08-13 17:35:37,968] ERROR Shutdown broker because all log dirs in 
/tmp/EmbeddedKafkaCluster6777164631574762227 have failed 
(kafka.log.LogManager:143)
[2023-08-13 17:35:37,968] WARN Abrupt service halt with code 1 and message null 
(org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster:130)
[2023-08-13 17:35:37,968] ERROR [LogDirFailureHandler]: Error due to 
(kafka.server.ReplicaManager$LogDirFailureHandler:135)
org.apache.kafka.connect.util.clusters.UngracefulShutdownException: Abrupt 
service halt with code 1 and message null {code}
I am wondering if we could add configurable retry around the root cause to 
tolerate the possible I/O faults so that if the retry is successful, the 
embedded connect cluster could still operate. 

Any comments and suggestions would be appreciated.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14886) Broker request handler thread pool is full due to single request slowdown

2023-04-10 Thread Haoze Wu (Jira)
Haoze Wu created KAFKA-14886:


 Summary: Broker request handler thread pool is full due to single 
request slowdown
 Key: KAFKA-14886
 URL: https://issues.apache.org/jira/browse/KAFKA-14886
 Project: Kafka
  Issue Type: Improvement
Affects Versions: 2.8.0
Reporter: Haoze Wu


In Kafka-2.8.0, we found that the number of data plane Kafka request handlers 
may quickly reach the limit when only one request is stuck. As a result, all 
other requests that require a data plane request handler will be stuck.

When there is a slowdown inside the storeOffsets function at line 777 due to 
I/O operation, the thread holds the lock acquired at line 754.

 
{code:java}
  private def doCommitOffsets(group: GroupMetadata,
                              memberId: String,
                              groupInstanceId: Option[String],
                              generationId: Int,
                              offsetMetadata: immutable.Map[TopicPartition, 
OffsetAndMetadata],
                              responseCallback: immutable.Map[TopicPartition, 
Errors] => Unit): Unit = {
    group.inLock { // Line 754
..
      groupManager.storeOffsets() // Line 777
..
  }
} {code}
Its call stack is:

 
{code:java}
kafka.coordinator.group.GroupMetadata,inLock,227
kafka.coordinator.group.GroupCoordinator,handleCommitOffsets,755
kafka.server.KafkaApis,handleOffsetCommitRequest,515
kafka.server.KafkaApis,handle,175
kafka.server.KafkaRequestHandler,run,74
java.lang.Thread,run,748 {code}
This happens when the broker is handling the commit offset request from the 
consumer. When the slowdown mentioned above makes consumers get no response 
back, the consumer will automatically resend the request to the broker. Note 
that each request from the consumer is handled by a 
data-plane-kafka-request-handler thread. Therefore, another 
data-plane-kafka-request-handler thread will be also stuck at line 754 when 
handling the retry requests, because it tries to acquire the very same lock of 
the consumer group. The retry will occur repeatedly, and none of them can 
succeed. As a result, the pool of data-plane-kafka-request-handler threads will 
be full. Note that this pool of threads is responsible for handling all such 
requests from all producers and consumers. As a result, all the producers and 
consumers would be affected.

However, the backoff mechanism might be able to solve this issue, by reducing 
the number of requests in a short time and reserving more slots in the thread 
pool. Therefore, we increase the backoff config “retry-backoff-ms”, to see if 
the issue disappears. Specifically, we increase the retry backoff from 100ms 
(default) to 1000ms in consumer’s config. However, we found that the mentioned 
thread pool is full again, because there are multiple heartbeat requests that 
take up the slots of this thread pool. All those heartbeat request handling is 
stuck when they are acquiring the same consumer group lock, which has been 
acquired at line 754 as mentioned. Specifically, the heartbeat handling is 
stuck at GroupCoordinator.handleHeartbeat@624:
{code:java}
  def handleHeartbeat(groupId: String,
                      memberId: String,
                      groupInstanceId: Option[String],
                      generationId: Int,
                      responseCallback: Errors => Unit): Unit = {
..
      case Some(group) => group.inLock { // Line 624
..
      }
..
} {code}
The heartbeat requests are sent at the interval of 3000ms (by default) from the 
consumer. It has no backoff mechanism. The thread pool for 
data-plane-kafka-request-handler will be full soon.

Fix: 

Instead of waiting for the lock, we can just try to acquire the lock (probably 
with a time limit). If the acquisition fails, this request can be discarded so 
that other requests (which include the retry of the discarded one) can be 
processed. However, we feel this fix would affect the semantic of many 
operations. We would like to hear some suggestions from the community.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14882) Uncoordinated states about topic in ZooKeeper nodes and Kafka brokers cause TopicExistException at client

2023-04-06 Thread Haoze Wu (Jira)
Haoze Wu created KAFKA-14882:


 Summary: Uncoordinated states about topic in ZooKeeper nodes and 
Kafka brokers cause TopicExistException at client
 Key: KAFKA-14882
 URL: https://issues.apache.org/jira/browse/KAFKA-14882
 Project: Kafka
  Issue Type: Improvement
Affects Versions: 2.8.0
Reporter: Haoze Wu


We have been doing testing on Kafka-2.8.0. We found some scenarios where 
TopicExistException happens and we feel the design of the topic create process 
in Kafka may confuse the users sometimes.

When a user uses a client which sends a topic create request to a Kafka broker, 
and the following steps will happen:
 # AdminManager check topic path in zkNodes and throw TopicExistException if 
the topic exists (Kafka sends request to ZooKeeper)
 # AdminManager add topic path in zkNodes (Kafka sends request to ZooKeeper)
 # Controller’s ZookperRequestWatcher detect it and put the corresponding event 
(ZooKeeper Watcher sends message to Kafka)
 # Event kicked out of queue and get executed (Kafka broker (controller) sends 
LeaderAndIsrRequest to Kafka broker (may include itself))
 # Broker handles the request and back to step #1

A symptom we observed is that when step #4 has some delay (stuck for some 
reason) and then the client may retry (send the topic create request again), 
which triggers TopicExistException in step #1. However, The topic create 
request should occur as kind of “transaction”. It should have some atomicity 
and also be robust under concurrent topic creation.

After some inspection, we found that it is not easy for us to implement such 
feature to the Kafka given the current implementation. But we do have the 
complaint that the user client gets TopicExistException when the topic is not 
actually existing or ready.

We suggest that maybe we can at least have some utility which help users 
mitigate this issue. For example, provide a tool which help users clean the 
ZooKeeper data and make sure the consistency of the topic metadata.

We are waiting for some feedbacks from the community. We can provided some 
concrete cases and reproduction scripts and analysis of the workload if needed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-13538) Unexpected TopicExistsException related to Admin#createTopics after broker crash

2021-12-12 Thread Haoze Wu (Jira)
Haoze Wu created KAFKA-13538:


 Summary: Unexpected TopicExistsException related to 
Admin#createTopics after broker crash
 Key: KAFKA-13538
 URL: https://issues.apache.org/jira/browse/KAFKA-13538
 Project: Kafka
  Issue Type: Bug
  Components: build
Affects Versions: 2.8.0
Reporter: Haoze Wu


We were using the official Kafka Java API to create a topic in a Kafka broker 
cluster (3 brokers):
{code:java}
CreateTopicsResult result = admin.createTopics(...);
... = result.all().get(); {code}
The topic we create always has replication factor = 2, and partition = 2. If 
one of the brokers crashes for some reason and the client tries to create a 
topic exactly in this crashed broker, we usually observe that the client may 
suffer from a delay of a few seconds due to the disconnection issue, and then 
the client automatically connects to another broker and creates the topic in 
this broker. Everything is done automatically in the client, under the code of 
`admin.createTopics(...)` and `result.all().get()`.

However, we found that sometimes we got `TopicExistsException` from 
`result.all().get()`, but we had never created this topic beforehand.

After some investigation on the source code of client, we found that this issue 
happens in this way:
 # The client connects to a broker (say, broker X) and then sends the topic 
creation request.
 # This topic has replication factor = 2 and partition = 2, so broker X may 
inform another broker of this information.
 # Broker X suddenly crashes for some reason, and the response for the topic 
creation request has not been sent back to the client.
 # The client eventually learns that broker X crashes, but never gets the 
response for the topic creation request. Thus the client thinks the topic 
creation request fails, and thus connects to another broker (say, broker Y) and 
then sends the topic creation request again.
 # This topic creation request (with replication factor = 2 and partition = 2) 
had been partially executed before broker X crashes, so broker Y may have done 
something required by broker X. For example, broker Y has some metadata about 
this topic. Therefore, when Broker Y does some sanity check with the metadata, 
it will find this topic exists, so broker Y directly returns 
`TopicExistsException` as the response.
 # The client receives `TopicExistsException`, and directly believes that this 
topic has been created, so it is thrown back to the user with the API 
`result.all().get()`.

There are 2 diagrams illustrating these 6 steps:

 

 

Now the core question is whether this workflow violates the semantic & design 
of the Kafka Client API. We read the “Create Topics Response” section in KIP-4 
([https://cwiki.apache.org/confluence/display/kafka/kip-4+-+command+line+and+centralized+administrative+operations]).
 We found that the description in KIP-4 focuses on the batch request of topic 
creations and how they work independently. It does not talk about how the 
client should deal with the aforementioned buggy scenario.

According to “common sense”, we think the client should be able to know that 
the metadata existing in broker Y is actually created by the client via the 
crashed broker X. Also, the client should not throw `TopicExistsException` to 
the user.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13468) Consumers may hang because IOException in Log# does not trigger KafkaStorageException

2021-11-19 Thread Haoze Wu (Jira)
Haoze Wu created KAFKA-13468:


 Summary: Consumers may hang because IOException in Log# does 
not trigger KafkaStorageException
 Key: KAFKA-13468
 URL: https://issues.apache.org/jira/browse/KAFKA-13468
 Project: Kafka
  Issue Type: Bug
  Components: log
Affects Versions: 2.8.0
Reporter: Haoze Wu


When the Kafka Log class (`core/src/main/scala/kafka/log/Log.scala`) is 
initialized, it may encounter an IO exception in the locally block, e.g., when 
the log directory cannot be created due to permission issue or IOException in  
`initializeLeaderEpochCache`, `initializePartitionMetadata`, etc.

 
{code:java}
class Log(...) {
  // ...
  locally {
    // create the log directory if it doesn't exist
    Files.createDirectories(dir.toPath)

    initializeLeaderEpochCache()
    initializePartitionMetadata()

    val nextOffset = loadSegments()
    // ...
  }
  // ...
}{code}
We found that the broker encountering the IO exception prints an KafkaApi error 
log like the following and proceeds. 

 
{code:java}
[2021-11-17 22:41:30,057] ERROR [KafkaApi-1] Error when handling request: 
clientId=1, correlationId=1, api=LEADER_AND_ISR, version=5, 
body=LeaderAndIsrRequestData(controllerId=1, controllerEpoch=1, 
brokerEpoch=4294967362, type=0, ungroupedPartitionStates=[], 
topicStates=[LeaderAndIsrTopicState(topicName='gray-2-0', 
topicId=573bAVHfRQeXApzAKevNIg, 
partitionStates=[LeaderAndIsrPartitionState(topicName='gray-2-0', 
partitionIndex=1, controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1, 3], 
zkVersion=0, replicas=[1, 3], addingReplicas=[], removingReplicas=[], 
isNew=true)]), LeaderAndIsrTopicState(topicName='gray-1-0', 
topicId=12dW2FxLTiyKmGi41HhdZQ, 
partitionStates=[LeaderAndIsrPartitionState(topicName='gray-1-0', 
partitionIndex=1, controllerEpoch=1, leader=3, leaderEpoch=0, isr=[3, 1], 
zkVersion=0, replicas=[3, 1], addingReplicas=[], removingReplicas=[], 
isNew=true)]), LeaderAndIsrTopicState(topicName='gray-3-0', 
topicId=_yvmANyZSoK_PTV0e-nqCA, 
partitionStates=[LeaderAndIsrPartitionState(topicName='gray-3-0', 
partitionIndex=1, controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1, 3], 
zkVersion=0, replicas=[1, 3], addingReplicas=[], removingReplicas=[], 
isNew=true)])], liveLeaders=[LeaderAndIsrLiveLeader(brokerId=1, 
hostName='localhost', port=9791), LeaderAndIsrLiveLeader(brokerId=3, 
hostName='localhost', port=9793)]) (kafka.server.RequestHandlerHelper) {code}
But all the consumers that are consuming data from the affected topics 
(“gray-2-0”, “gray-1-0”, “gray-3-0”) are not able to proceed. These consumers 
don’t have any error log related to this issue. They hang for more than 3 
minutes.

 

The IOException sometimes affects multiple offset topics:

 
{code:java}
[2021-11-18 10:57:41,289] ERROR [KafkaApi-1] Error when handling request: 
clientId=1, correlationId=11, api=LEADER_AND_ISR, version=5, 
body=LeaderAndIsrRequestData(controllerId=1, controllerEpoch=1, 
brokerEpoch=4294967355, type=0, ungroupedPartitionStates=[], 
topicStates=[LeaderAndIsrTopicState(topicName='__consumer_offsets', 
topicId=_MiMTCViS76osIyDdxekIg, 
partitionStates=[LeaderAndIsrPartitionState(topicName='__consumer_offsets', 
partitionIndex=15, controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1], 
zkVersion=0, replicas=[1], addingReplicas=[], removingReplicas=[], isNew=true), 
LeaderAndIsrPartitionState(topicName='__consumer_offsets', partitionIndex=48, 
controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1], zkVersion=0, replicas=[1], 
addingReplicas=[], removingReplicas=[], isNew=true), 
LeaderAndIsrPartitionState(topicName='__consumer_offsets', partitionIndex=45, 
controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1], zkVersion=0, replicas=[1], 
addingReplicas=[], removingReplicas=[], isNew=true), ...
addingReplicas=[], removingReplicas=[], isNew=true), 
LeaderAndIsrPartitionState(topicName='__consumer_offsets', partitionIndex=33, 
controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1], zkVersion=0, replicas=[1], 
addingReplicas=[], removingReplicas=[], isNew=true)])], 
liveLeaders=[LeaderAndIsrLiveLeader(brokerId=1, hostName='localhost', 
port=9791)]) (kafka.server.RequestHandlerHelper) {code}
*Analysis*

 

The key stacktrace is as follows:
{code:java}
"java.lang.Thread,run,748",
"kafka.server.KafkaRequestHandler,run,74",
"kafka.server.KafkaApis,handle,236",
"kafka.server.KafkaApis,handleLeaderAndIsrRequest,258",
"kafka.server.ReplicaManager,becomeLeaderOrFollower,1411",
"kafka.server.ReplicaManager,makeLeaders,1566",
"scala.collection.mutable.HashMap,foreachEntry,499",
"scala.collection.mutable.HashMap$Node,foreachEntry,633",
"kafka.utils.Implicits$MapExtensionMethods$,$anonfun$forKeyValue$1,62",
"kafka.server.ReplicaManager,$anonfun$makeLeaders$5,1568",
"kafka.cluster.Partition,makeLeader,548",
"kafka.cluster.Partition,$anonfun$makeLeader$1,564",
"kafka.cluster.Partition,createLogIfNotExists,324",

[jira] [Created] (KAFKA-13457) socketChannel in Acceptor#accept is not closed upon IOException

2021-11-15 Thread Haoze Wu (Jira)
Haoze Wu created KAFKA-13457:


 Summary: socketChannel in Acceptor#accept is not closed upon 
IOException
 Key: KAFKA-13457
 URL: https://issues.apache.org/jira/browse/KAFKA-13457
 Project: Kafka
  Issue Type: Bug
  Components: network
Affects Versions: 2.8.0
Reporter: Haoze Wu


When the kafka.network.Acceptor in SocketServer.scala accepts a new connection 
in the `accept` function, it handles the `TooManyConnectionsException` and 
`ConnectionThrottledException`. However, line 717 or the socketChannel 
operations within the try block may potentially throw an IOException as well, 
which is not handled.

 
{code:java}
//core/src/main/scala/kafka/network/SocketServer.scala
// Acceptor class
  private def accept(key: SelectionKey): Option[SocketChannel] = {
    val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
    val socketChannel = serverSocketChannel.accept()     // line 717
    try {
      connectionQuotas.inc(endPoint.listenerName, 
socketChannel.socket.getInetAddress, blockedPercentMeter)
      socketChannel.configureBlocking(false)             
      socketChannel.socket().setTcpNoDelay(true)         
      socketChannel.socket().setKeepAlive(true)          
      if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
        socketChannel.socket().setSendBufferSize(sendBufferSize)
      Some(socketChannel)
    } catch {
      case e: TooManyConnectionsException =>       
        info(s"Rejected connection from ${e.ip}, address already has the 
configured maximum of ${e.count} connections.")
        close(endPoint.listenerName, socketChannel)
        None
      case e: ConnectionThrottledException => 
        val ip = socketChannel.socket.getInetAddress
        debug(s"Delaying closing of connection from $ip for ${e.throttleTimeMs} 
ms")
        val endThrottleTimeMs = e.startThrottleTimeMs + e.throttleTimeMs
        throttledSockets += DelayedCloseSocket(socketChannel, endThrottleTimeMs)
        None
    }
  }
{code}
This thrown IOException is caught in the caller `acceptNewConnections` in line 
706, which only prints an error message. The socketChannel that throws this 
IOException is not closed.

 
{code:java}
//core/src/main/scala/kafka/network/SocketServer.scala
  private def acceptNewConnections(): Unit = {
    val ready = nioSelector.select(500)
    if (ready > 0) {
      val keys = nioSelector.selectedKeys()
      val iter = keys.iterator()
      while (iter.hasNext && isRunning) {
        try {
          val key = iter.next
          iter.remove()          if (key.isAcceptable) {
            accept(key).foreach { socketChannel => 
                ...
              } while (!assignNewConnection(socketChannel, processor, 
retriesLeft == 0))
            }
          } else
            throw new IllegalStateException("Unrecognized key state for 
acceptor thread.")
        } catch {
          case e: Throwable => error("Error while accepting connection", e)   
// line 706
        }
      }
    }
  }
{code}
We found during testing this would cause our Kafka clients to experience errors 
(InvalidReplicationFactorException) for 40+ seconds when creating new topics. 
After 40 seconds, the clients would be able to create new topics successfully.

We check that after adding the socketChannel.close() upon IOException, the 
symptoms will disappear, so the clients do not need to wait for 40s to be 
working again.

 

 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)